Coverage for src/nendo/library/sqlalchemy_library.py: 76%
694 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-30 09:47 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-30 09:47 +0100
1# -*- encoding: utf-8 -*-
2# ruff: noqa: S603, S607, PLW1510
3"""Module implementing the NendoLibraryPlugin using SQLAlchemy.
5This module implements an SQLAlchemy version of the NendoLibraryPlugin.
6The plugin is not suitable to be used by itself but should be inherited from
7when implementing a new backend for the Nendo Library using SQLAlchemy. An example
8is given by the DuckDB implementation used as the default Nendo Library.
9"""
11from __future__ import annotations
13import logging
14import os
15import pickle
16import subprocess
17import uuid
18from contextlib import contextmanager
19from datetime import datetime
20from tempfile import NamedTemporaryFile
21from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Union
23import librosa
24import numpy as np
25import soundfile as sf
26from sqlalchemy import Float, and_, asc, desc, func, or_, true
27from sqlalchemy.orm import Query, Session, joinedload, sessionmaker
28from sqlalchemy.sql.expression import cast
29from sqlalchemy.sql.sqltypes import Text
30from tinytag import TinyTag
32from nendo import schema
33from nendo.library import model
34from nendo.utils import AudioFileUtils, ensure_uuid, md5sum
36if TYPE_CHECKING:
37 from pydantic import DirectoryPath, FilePath
39logger = logging.getLogger("nendo")
42class SqlAlchemyNendoLibrary(schema.NendoLibraryPlugin):
43 """Implementation of the `NendoLibraryPlugin` using SQLAlchemy."""
45 iter_index: int = 0
46 user: schema.NendoUser = None
48 def __init__( # noqa: D107
49 self,
50 **kwargs: Any,
51 ) -> None:
52 super().__init__(**kwargs)
53 self.iter_index = 0
55 @property
56 def default_user(self):
57 """Default Nendo user."""
58 return schema.NendoUser(
59 id=uuid.UUID(self.config.user_id),
60 name=self.config.user_name,
61 password="nendo", # noqa: S106
62 email="info@okio.ai",
63 verified=True,
64 )
66 @contextmanager
67 def session_scope(self):
68 """Provide a transactional scope around a series of operations."""
69 session = sessionmaker(autocommit=False, autoflush=False, bind=self.db)()
70 try:
71 yield session
72 session.commit()
73 except:
74 session.rollback()
75 raise
76 finally:
77 session.close()
79 def _disconnect(self):
80 """Close DuckDB."""
81 self.db.close()
83 def _ensure_user_uuid(self, user_id: Optional[Union[str, uuid.UUID]] = None):
84 return ensure_uuid(user_id) if user_id is not None else self.user.id
86 def _convert_plugin_data(self, value: Any, user_id: Optional[uuid.UUID] = None):
87 # numpy matrices are stored as .npy blob and the blob ID is returned as
88 # the plugin data value
89 if isinstance(value, (np.ndarray, np.matrix)):
90 with NamedTemporaryFile(suffix=".npy", delete=True) as tmpfile:
91 np.save(tmpfile.name, value)
92 blob = self.store_blob(file_path=tmpfile.name, user_id=user_id)
93 return str(blob.id)
94 try:
95 return str(value)
96 except Exception as e: # noqa: BLE001
97 logger.error("Failed to load plugin data: %s", e)
99 # ==========================
100 #
101 # TRACK MANAGEMENT FUNCTIONS
102 #
103 # ==========================
105 def __len__(self):
106 """Obtain the number of tracks."""
107 with self.session_scope() as session:
108 return session.query(model.NendoTrackDB).count()
110 def __iter__(self):
111 return self
113 def __next__(self):
114 with self.session_scope() as session:
115 query = session.query(model.NendoTrackDB)
116 track = query.offset(self.iter_index).first()
117 if track:
118 self.iter_index += 1
119 return schema.NendoTrack.model_validate(track)
120 raise StopIteration
122 def _create_track_from_file(
123 self,
124 file_path: FilePath,
125 track_type: str = "track",
126 copy_to_library: Optional[bool] = None,
127 skip_duplicate: Optional[bool] = None,
128 user_id: Optional[uuid.UUID] = None,
129 meta: Optional[Dict[str, Any]] = None,
130 ) -> schema.NendoTrackCreate:
131 """Create a NendoTrack from the file given by file_path.
133 Args:
134 file_path (Union[FilePath, str]): Path to the file to be added.
135 track_type (str, optional): Type of the track. Defaults to "track".
136 copy_to_library (bool, optional): Flag that specifies whether
137 the file should be copied into the library directory.
138 Defaults to None.
139 skip_duplicate (bool, optional): Flag that specifies whether a
140 file should be added that already exists in the library, based on its
141 file checksum. Defaults to None.
142 user_id (UUID, optional): ID of user adding the track.
143 meta (dict, optional): Metadata to attach to the track upon adding.
145 Returns:
146 schema.NendoTrackCreate: The created NendoTrack.
147 """
148 if not os.path.isfile(file_path):
149 raise schema.NendoResourceError("File not found", file_path)
151 if not AudioFileUtils().is_supported_filetype(file_path):
152 raise schema.NendoResourceError("Unsupported filetype", file_path)
154 file_checksum = md5sum(file_path)
156 # skip adding a duplicate based on config flag and hashsum of the file
157 skip_duplicate = skip_duplicate or self.config.skip_duplicate
158 if skip_duplicate:
159 tracks = list(self.find_tracks(value=file_checksum))
160 if len(tracks) > 0:
161 return schema.NendoTrack.model_validate(tracks[0])
163 meta = meta or {}
165 # gather file metadata
166 try:
167 # extract ID3 tags
168 tags = TinyTag.get(file_path)
169 meta.update(tags.as_dict())
170 except KeyError as e:
171 logger.error("Failed extracting tags from file: %s, Error: %s", file_path, e)
173 # convert and save to library
174 copy_to_library = copy_to_library or self.config.copy_to_library
175 if copy_to_library or (self.config.auto_convert and file_path.endswith(".mp3")):
176 try:
177 file_stats = os.stat(file_path)
178 sr = None
179 if self.config.auto_convert:
180 if file_path.endswith(".mp3"):
181 signal, sr = librosa.load(path=file_path, sr=None, mono=False)
182 else:
183 signal, sr = sf.read(file=file_path)
184 # resample to default rate if required
185 if self.config.auto_resample and sr != self.config.default_sr:
186 logger.info(
187 "Auto-converting to SR of %d",
188 self.config.default_sr,
189 )
190 signal = librosa.resample(
191 signal,
192 orig_sr=sr,
193 target_sr=self.config.default_sr,
194 )
195 sr = self.config.default_sr
196 # sf.write expects the channels in the second dimension of the
197 # signal array! Librosa.load() loads them into the first dimension
198 signal = np.transpose(signal) if signal.shape[0] <= 2 else signal
199 path_in_library = self.storage_driver.save_signal(
200 file_name=self.storage_driver.generate_filename(
201 filetype="wav",
202 user_id=user_id,
203 ),
204 signal=signal,
205 sr=sr,
206 user_id=str(user_id) if user_id else str(self.user.id),
207 )
208 else:
209 # save file to storage in its original format
210 path_in_library = self.storage_driver.save_file(
211 file_name=self.storage_driver.generate_filename(
212 filetype=os.path.splitext(file_path)[1][1:], # without dot
213 user_id=user_id,
214 ),
215 file_path=file_path,
216 user_id=str(user_id) if user_id else str(self.user.id),
217 )
219 # save the parsed sample rate
220 if sr is not None:
221 meta["sr"] = sr
223 meta.update(
224 {
225 "original_filename": os.path.basename(file_path),
226 "original_filepath": os.path.dirname(file_path),
227 "original_size": file_stats.st_size,
228 "original_checksum": file_checksum,
229 },
230 )
231 location = self.storage_driver.get_driver_location()
232 except Exception as e: # noqa: BLE001
233 raise schema.NendoLibraryError(
234 f"Error copying file to the library: {e}.",
235 ) from e
236 else:
237 path_in_library = file_path
238 location = schema.ResourceLocation.original
240 resource = schema.NendoResource(
241 file_path=self.storage_driver.get_file_path(
242 src=path_in_library,
243 user_id=str(user_id) if user_id else str(self.user.id),
244 ),
245 file_name=self.storage_driver.get_file_name(
246 src=path_in_library,
247 user_id=str(user_id) if user_id else str(self.user.id),
248 ),
249 resource_type="audio",
250 location=location,
251 meta=meta,
252 )
253 return schema.NendoTrackCreate(
254 nendo_instance=self.nendo_instance,
255 resource=resource.model_dump(),
256 user_id=user_id or self.user.id,
257 track_type=track_type,
258 meta=meta or {},
259 )
261 def _create_track_from_signal(
262 self,
263 signal: np.ndarray,
264 sr: int,
265 track_type: str = "track",
266 meta: Optional[Dict[str, Any]] = None,
267 user_id: Optional[uuid.UUID] = None,
268 ) -> schema.NendoTrackCreate:
269 """Create a NendoTrack from the given signal.
271 Args:
272 signal (np.ndarray): The numpy array containing the audio signal.
273 sr (int): Sample rate
274 track_type (str): Track type.
275 user_id (UUID, optional): The ID of the user adding the track.
276 meta (Dict[str, Any], optional): Track metadata. Defaults to {}.
278 Returns:
279 schema.NendoTrackCreate: The created NendoTrack.
280 """
281 target_file = None
282 # sf.write expects the channels in the second dimension of the
283 # signal array! Librosa.load() loads them into the first dimension
284 signal = np.transpose(signal) if signal.shape[0] <= 2 else signal
285 try:
286 if self.config.auto_resample and sr != self.config.default_sr:
287 logger.info("Auto-converting to SR of %d", self.config.default_sr)
288 signal = librosa.resample(
289 signal, orig_sr=sr, target_sr=self.config.default_sr
290 )
291 sr = self.config.default_sr
292 target_file = self.storage_driver.save_signal(
293 file_name=self.storage_driver.generate_filename(
294 filetype="wav",
295 user_id=str(user_id) if user_id else str(self.user.id),
296 ),
297 signal=signal,
298 sr=sr,
299 user_id=user_id,
300 )
301 except Exception as e: # noqa: BLE001
302 raise schema.NendoLibraryError(
303 f"Failed writing file {target_file} to the library. Error: {e}."
304 ) from None
305 resource = schema.NendoResource(
306 file_path=self.storage_driver.get_file_path(
307 src=target_file,
308 user_id=str(user_id) if user_id else str(self.user.id),
309 ),
310 file_name=self.storage_driver.get_file_name(
311 src=target_file,
312 user_id=str(user_id) if user_id else str(self.user.id),
313 ),
314 resource_type="audio",
315 location=self.storage_driver.get_driver_location(),
316 )
317 # save sample rate
318 meta = meta or {}
319 meta["sr"] = sr
320 return schema.NendoTrackCreate(
321 resource=resource.model_dump(),
322 user_id=user_id or self.user.id,
323 track_type=track_type,
324 meta=meta,
325 )
327 @schema.NendoPlugin.batch_process
328 def _add_tracks_db(
329 self,
330 file_paths: List[FilePath],
331 track_type: str = "track",
332 copy_to_library: Optional[bool] = None,
333 skip_duplicate: bool = True,
334 user_id: Optional[uuid.UUID] = None,
335 meta: Optional[Dict[str, Any]] = None,
336 ) -> List[schema.NendoTrack]:
337 """Add multiple tracks to the library from their file_paths.
339 Args:
340 file_paths (List[FilePath]): A list of full file paths to add as tracks.
341 track_type (str, optional): The track_type of the tracks to add.
342 Defaults to "track".
343 copy_to_library (bool, optional): Flag that specifies whether
344 the file should be copied into the library directory.
345 Defaults to None.
346 skip_duplicate (bool, optional): Flag that specifies whether a
347 file should be added that already exists in the library, based on its
348 file checksum. Defaults to None.
349 user_id (UUID, optional): ID of user adding the track.
350 meta (dict, optional): Metadata to attach to the track upon adding.
352 Returns:
353 List[schema.NendoTrack]: A list containing all added NendoTracks.
354 """
355 create_list = []
356 for fp in file_paths:
357 try:
358 create_track = self._create_track_from_file(
359 file_path=fp,
360 track_type=track_type,
361 copy_to_library=copy_to_library,
362 skip_duplicate=skip_duplicate,
363 user_id=user_id or self.user.id,
364 meta=meta,
365 )
366 create_list.append(create_track)
367 except schema.NendoLibraryError as e:
368 logger.error("Failed adding file %s. Error: %s", fp, e)
369 with self.session_scope() as session:
370 db_tracks = self._upsert_tracks_db(tracks=create_list, session=session)
371 return [schema.NendoTrack.model_validate(t) for t in db_tracks]
373 def _get_related_tracks_query(
374 self,
375 track_id: uuid.UUID,
376 session: Session,
377 user_id: Optional[uuid.UUID] = None,
378 ) -> Query:
379 """Get tracks with a relationship to the track with track_id from the DB.
381 Args:
382 track_id (UUID): ID of the track to be searched for.
383 session (Session): Session to be used for the transaction.
384 user_id (UUID, optional): The user ID to filter for.
386 Returns:
387 Query: The SQLAlchemy query object.
388 """
389 user_id = user_id or self.user.id
390 return (
391 session.query(model.NendoTrackDB)
392 .join(
393 model.TrackTrackRelationshipDB,
394 model.NendoTrackDB.id == model.TrackTrackRelationshipDB.source_id,
395 )
396 .filter(
397 and_(
398 model.NendoTrackDB.user_id == user_id,
399 or_(
400 model.TrackTrackRelationshipDB.target_id == track_id,
401 model.TrackTrackRelationshipDB.source_id == track_id,
402 ),
403 ),
404 )
405 )
407 def _upsert_track_track_relationship(
408 self, relationship: schema.NendoRelationshipBase, session: Session,
409 ) -> schema.NendoTrack:
410 """Insert or replace a track-to-track relationship in the database.
412 Args:
413 relationship (schema.NendoRelationshipBase): The relationship to upsert.
414 session (sqlalchemy.Session): Session object to commit to.
416 Returns:
417 schema.NendoTrack: The upserted NendoTrack.
418 """
419 if type(relationship) == schema.NendoRelationshipCreate:
420 # creating new relationship
421 db_rel = model.TrackTrackRelationshipDB(**relationship.model_dump())
422 session.add(db_rel)
423 else:
424 # updating existing relationship
425 db_rel = (
426 session.query(model.TrackTrackRelationshipDB)
427 .filter_by(id=relationship.id)
428 .first()
429 )
430 db_rel.source_id = relationship.source_id
431 db_rel.target_id = relationship.target_id
432 db_rel.relationship_type = relationship.relationship_type
433 db_rel.meta = relationship.meta
434 session.commit()
435 return db_rel
437 def _upsert_track_db(
438 self, track: schema.NendoTrackBase, session: Session,
439 ) -> model.NendoTrackDB:
440 """Create track in DB or update if it exists.
442 Args:
443 track (schema.NendoTrackBase): Track object to be created
444 session (Session): Session to be used for the transaction
446 Returns:
447 model.NendoTrackDB: The ORM model object of the upserted track
448 """
449 if type(track) == schema.NendoTrackCreate:
450 # create new track
451 track_dict = track.model_dump()
452 track_dict.pop("nendo_instance")
453 db_track = model.NendoTrackDB(**track_dict)
454 session.add(db_track)
455 else:
456 # update existing track
457 db_track = (
458 session.query(model.NendoTrackDB).filter_by(id=track.id).one_or_none()
459 )
460 if db_track is None:
461 raise schema.NendoTrackNotFoundError("Track not found", id=track.id)
462 db_track.user_id = track.user_id
463 db_track.visibility = track.visibility
464 db_track.resource = track.resource.model_dump()
465 db_track.track_type = track.track_type
466 db_track.images = track.images
467 db_track.meta = track.meta
468 session.commit()
469 return db_track
471 def _upsert_tracks_db(
472 self, tracks: List[schema.NendoTrackCreate], session: Session,
473 ) -> List[model.NendoTrackDB]:
474 """Create multiple tracks in DB or update if it exists.
476 Args:
477 tracks (List[schema.NendoTrackCreate]): Track object to be created
478 session (Session): Session to be used for the transaction
480 Returns:
481 List[model.NendoTrackDB]: The ORM model objects of the upserted tracks
482 """
483 db_tracks = []
484 for track in tracks:
485 track_dict = track.model_dump()
486 track_dict.pop("nendo_instance")
487 db_tracks.append(model.NendoTrackDB(**track_dict))
488 session.add_all(db_tracks)
489 session.commit()
490 return db_tracks
492 def _get_all_plugin_data_db(
493 self,
494 track_id: uuid.UUID,
495 session: Session,
496 user_id: Optional[uuid.UUID] = None,
497 ) -> List[model.NendoPluginDataDB]:
498 """Get all plugin data related to a track from the DB.
500 Args:
501 track_id (UUID): Track ID to get the related plugin data for.
502 session (Session): SQLAlchemy session.
503 user_id (UUID, optional): The user ID to filter for.
505 Returns:
506 List[model.NendoPluginDataDB]: List of nendo plugin data entries.
507 """
508 user_id = user_id or self.user.id
509 with self.session_scope() as session:
510 plugin_data_db = (
511 session.query(model.NendoPluginDataDB)
512 .filter(
513 and_(
514 model.NendoPluginDataDB.track_id == track_id,
515 model.NendoPluginDataDB.user_id == user_id,
516 ),
517 )
518 .all()
519 )
520 return [
521 schema.NendoPluginData.model_validate(pdb) for pdb in plugin_data_db
522 ]
524 def _get_all_plugin_data_db_by_name(
525 self,
526 track_id: uuid.UUID,
527 plugin_name: str,
528 session: Session,
529 user_id: Optional[uuid.UUID] = None,
530 ) -> List[model.NendoPluginDataDB]:
531 """Get all plugin data related to a track and a given pluginfrom the DB.
533 Args:
534 track_id (uuid.UUID): Track ID to get the related plugin data for.
535 plugin_name (str): Name of the plugin to get related data for.
536 session (Session): SQLAlchemy session.
537 user_id (UUID, optional): The user ID to filter for.
539 Returns:
540 List[model.NendoPluginDataDB]: List of nendo plugin data entries.
541 """
542 user_id = user_id or self.user.id
543 with self.session_scope() as session:
544 plugin_data_db = (
545 session.query(model.NendoPluginDataDB)
546 .filter(
547 and_(
548 model.NendoPluginDataDB.track_id == track_id,
549 model.NendoPluginDataDB.plugin_name == plugin_name,
550 model.NendoPluginDataDB.user_id == user_id,
551 ),
552 )
553 .all()
554 )
555 return (
556 schema.NendoPluginData.model_validate(plugin_data_db)
557 if plugin_data_db is not None
558 else None
559 )
561 def _get_latest_plugin_data_db(
562 self,
563 track_id: uuid.UUID,
564 plugin_name: str,
565 plugin_version: str,
566 key: str,
567 session: Session,
568 user_id: Optional[uuid.UUID] = None,
569 ) -> model.NendoPluginDataDB:
570 """Get the latest plugin data for a given track id, plugin name and data key.
572 Args:
573 track_id (uuid.UUID): Track ID to get the related plugin data for.
574 plugin_name (str): Name of the plugin to get related data for.
575 plugin_version (str): Version of the plugin to get related data for.
576 key (str): Key by which to filter the plugin data.
577 session (Session): SQLAlchemy session.
578 user_id (UUID, optional): The user ID to filter for.
580 Returns:
581 model.NendoPluginDataDB: A single nendo plugin data entry.
582 """
583 user_id = user_id or self.user.id
584 with self.session_scope() as session:
585 plugin_data_db = (
586 session.query(model.NendoPluginDataDB)
587 .filter(
588 and_(
589 model.NendoPluginDataDB.track_id == track_id,
590 model.NendoPluginDataDB.plugin_name == plugin_name,
591 model.NendoPluginDataDB.plugin_version == plugin_version,
592 model.NendoPluginDataDB.key == key,
593 model.NendoPluginDataDB.user_id == user_id,
594 ),
595 )
596 .order_by(model.NendoPluginDataDB.updated_at.desc())
597 .first()
598 )
599 return (
600 schema.NendoPluginData.model_validate(plugin_data_db)
601 if plugin_data_db is not None
602 else None
603 )
605 def _insert_plugin_data_db(
606 self, plugin_data: schema.NendoPluginDataCreate, session: Session,
607 ) -> model.NendoPluginDataDB:
608 db_plugin_data = model.NendoPluginDataDB(**plugin_data.model_dump())
609 session.add(db_plugin_data)
610 session.commit()
611 return db_plugin_data
613 def _update_plugin_data_db(
614 self,
615 existing_plugin_data: model.NendoPluginDataDB,
616 plugin_data: schema.NendoPluginData,
617 session: Session,
618 ) -> model.NendoPluginDataDB:
619 if existing_plugin_data is None:
620 logger.error("Plugin data not found!")
621 return None
622 existing_plugin_data.key = plugin_data.key
623 existing_plugin_data.value = plugin_data.value
624 existing_plugin_data.user_id = plugin_data.user_id
625 session.commit()
626 return existing_plugin_data
628 def add_track(
629 self,
630 file_path: Union[FilePath, str],
631 track_type: str = "track",
632 copy_to_library: Optional[bool] = None,
633 skip_duplicate: Optional[bool] = None,
634 user_id: Optional[uuid.UUID] = None,
635 meta: Optional[Dict[str, Any]] = None,
636 ) -> schema.NendoTrack:
637 """Add the track given by path to the library.
639 Args:
640 file_path (Union[FilePath, str]): Path to the file to be added.
641 track_type (str, optional): Type of the track. Defaults to "track".
642 copy_to_library (bool, optional): Flag that specifies whether
643 the file should be copied into the library directory.
644 Defaults to None.
645 skip_duplicate (bool, optional): Flag that specifies whether a
646 file should be added that already exists in the library, based on its
647 file checksum. Defaults to None.
648 user_id (UUID, optional): ID of user adding the track.
649 meta (dict, optional): Metadata to attach to the track upon adding.
651 Returns:
652 schema.NendoTrack: The track that was added to the library.
653 """
654 skip_duplicate = skip_duplicate or self.config.skip_duplicate
655 track = self._create_track_from_file(
656 file_path=file_path,
657 track_type=track_type,
658 copy_to_library=copy_to_library,
659 skip_duplicate=skip_duplicate,
660 user_id=user_id or self.user.id,
661 meta=meta,
662 )
663 if track is not None:
664 with self.session_scope() as session:
665 db_track = self._upsert_track_db(track=track, session=session)
666 track = schema.NendoTrack.model_validate(db_track)
667 return track
669 def add_track_from_signal(
670 self,
671 signal: np.ndarray,
672 sr: int,
673 track_type: str = "track",
674 user_id: Optional[uuid.UUID] = None,
675 meta: Optional[Dict[str, Any]] = None,
676 ) -> schema.NendoTrack:
677 """Add a track to the library that is described by the given signal.
679 Args:
680 signal (np.ndarray): The numpy array containing the audio signal.
681 sr (int): Sample rate
682 track_type (str): Track type.
683 user_id (UUID, optional): The ID of the user adding the track.
684 meta (Dict[str, Any], optional): Track metadata. Defaults to {}.
686 Returns:
687 schema.NendoTrack: The added NendoTrack
688 """
689 track = self._create_track_from_signal(
690 signal=signal,
691 sr=sr,
692 track_type=track_type,
693 meta=meta,
694 user_id=user_id or self.user.id,
695 )
696 if track is not None:
697 with self.session_scope() as session:
698 db_track = self._upsert_track_db(track=track, session=session)
699 track = schema.NendoTrack.model_validate(db_track)
700 return track
702 def add_tracks(
703 self,
704 path: Union[str, DirectoryPath],
705 track_type: str = "track",
706 user_id: Optional[Union[str, uuid.UUID]] = None,
707 copy_to_library: Optional[bool] = None,
708 skip_duplicate: bool = True,
709 ) -> schema.NendoCollection:
710 """Scan the provided path and upsert the information into the library.
712 Args:
713 path (Union[str, DirectoryPath]): Path to the directory to be scanned
714 track_type (str, optional): Track type for the new tracks
715 user_id (UUID, optional): The ID of the user adding the track.
716 copy_to_library (bool): Copy and convert the data into the nendo Library?
717 skip_duplicate (bool): Skip adding duplicates?
719 Returns:
720 tracks (list[NendoTrack]): The tracks that were added to the Library
721 """
722 user_id = self._ensure_user_uuid(user_id)
723 file_list = []
724 if not os.path.exists(path):
725 raise schema.NendoLibraryError(f"Source directory {path} does not exist.")
726 for root, _, files in os.walk(path):
727 file_list.extend(
728 [
729 os.path.join(root, file)
730 for file in files
731 if AudioFileUtils().is_supported_filetype(file)
732 ],
733 )
734 tracks = self._add_tracks_db(
735 file_paths=file_list,
736 track_type=track_type,
737 copy_to_library=copy_to_library,
738 skip_duplicate=skip_duplicate,
739 user_id=user_id,
740 )
741 return self.add_collection(name=path, track_ids=[t.id for t in tracks])
743 def add_track_relationship(
744 self,
745 track_one_id: Union[str, uuid.UUID],
746 track_two_id: Union[str, uuid.UUID],
747 relationship_type: str = "relationship",
748 meta: Optional[Dict[str, Any]] = None,
749 ) -> bool:
750 """Add a relationship between two tracks."""
751 track_one = self.get_track(track_one_id) # track
752 track_two = self.get_track(track_two_id) # related
754 # check that tracks are not the same
755 if track_one_id == track_two_id:
756 logger.error("Error must provide two different existing track ids")
757 return False
759 # check if tracks exist
760 if track_one is None:
761 logger.error("Error track id %s not found", str(track_one_id))
762 return False
764 if track_two is None:
765 logger.error("Error track id %s not found", str(track_two_id))
766 return False
768 # create bidirectional relationship
769 relationship_from = schema.NendoRelationshipCreate(
770 source_id=track_one.id,
771 target_id=track_two.id,
772 relationship_type=relationship_type,
773 meta=meta or {},
774 )
775 relationship_to = schema.NendoRelationshipCreate(
776 source_id=track_two.id,
777 target_id=track_one.id,
778 relationship_type=relationship_type,
779 meta=meta or {},
780 )
781 with self.session_scope() as session:
782 relationship_from = schema.NendoRelationship.model_validate(
783 self._upsert_track_track_relationship(
784 relationship=relationship_from, session=session,
785 ),
786 )
787 _ = schema.NendoRelationship.model_validate(
788 self._upsert_track_track_relationship(
789 relationship=relationship_to, session=session,
790 ),
791 )
792 # avoid redundancy; only append one direction
793 track_one.related_tracks.append(relationship_from)
794 return True
796 def add_related_track(
797 self,
798 file_path: Union[str, FilePath],
799 related_track_id: Union[str, uuid.UUID],
800 track_type: str = "track",
801 user_id: Optional[Union[str, uuid.UUID]] = None,
802 track_meta: Optional[Dict[str, Any]] = None,
803 relationship_type: str = "relationship",
804 meta: Optional[Dict[str, Any]] = None,
805 ) -> schema.NendoTrack:
806 """Add a track from a file with a relationship to another track."""
807 user_id = self._ensure_user_uuid(user_id=user_id)
808 related_track_id = ensure_uuid(related_track_id)
809 track = self.add_track(
810 file_path=file_path, track_type=track_type, user_id=user_id, meta=track_meta,
811 )
812 # create bidirectional relationship
813 relationship_from = schema.NendoRelationshipCreate(
814 source_id=track.id,
815 target_id=related_track_id,
816 relationship_type=relationship_type,
817 meta=meta or {},
818 )
819 relationship_to = schema.NendoRelationshipCreate(
820 source_id=related_track_id,
821 target_id=track.id,
822 relationship_type=relationship_type,
823 meta=meta or {},
824 )
825 with self.session_scope() as session:
826 relationship_from = schema.NendoRelationship.model_validate(
827 self._upsert_track_track_relationship(
828 relationship=relationship_from, session=session,
829 ),
830 )
831 _ = schema.NendoRelationship.model_validate(
832 self._upsert_track_track_relationship(
833 relationship=relationship_to, session=session,
834 ),
835 )
836 # avoid redundancy; only append one direction
837 track.related_tracks.append(relationship_from)
838 return track
840 def add_related_track_from_signal(
841 self,
842 signal: np.ndarray,
843 sr: int,
844 related_track_id: Union[str, uuid.UUID],
845 track_type: str = "track",
846 user_id: Optional[uuid.UUID] = None,
847 track_meta: Optional[Dict[str, Any]] = None,
848 relationship_type: str = "relationship",
849 meta: Optional[Dict[str, Any]] = None,
850 ) -> schema.NendoTrack:
851 """Add a track from a signal with a relationship to another track."""
852 user_id = user_id or self.user.id
853 related_track_id = ensure_uuid(related_track_id)
854 track = self.add_track_from_signal(
855 signal=signal, sr=sr, track_type=track_type, meta=track_meta,
856 )
857 # create bidirectional relationship
858 relationship_from = schema.NendoRelationshipCreate(
859 source_id=track.id,
860 target_id=related_track_id,
861 relationship_type=relationship_type,
862 meta=meta or {},
863 )
864 relationship_to = schema.NendoRelationshipCreate(
865 source_id=track.id,
866 target_id=related_track_id,
867 relationship_type=relationship_type,
868 meta=meta or {},
869 )
870 with self.session_scope() as session:
871 relationship_from = schema.NendoRelationship.model_validate(
872 self._upsert_track_track_relationship(
873 relationship=relationship_from, session=session,
874 ),
875 )
876 _ = schema.NendoRelationship.model_validate(
877 self._upsert_track_track_relationship(
878 relationship=relationship_to, session=session,
879 ),
880 )
881 # avoid redundancy; only append one direction
882 track.related_tracks.append(relationship_from)
883 return track
885 def update_track(
886 self,
887 track: schema.NendoTrack,
888 ) -> schema.NendoTrack:
889 """Updates the given track by storing it to the database.
891 Args:
892 track (NendoTrack): The track to be stored to the database.
894 Raises:
895 NendoTrackNotFoundError: If the track passed to the function
896 does not exist in the database.
898 Returns:
899 NendoTrack: The updated track.
900 """
901 with self.session_scope() as session:
902 # update existing track
903 db_track = (
904 session.query(model.NendoTrackDB).filter_by(id=track.id).one_or_none()
905 )
906 if db_track is None:
907 raise schema.NendoTrackNotFoundError("Track not found", id=track.id)
908 db_track.user_id = track.user_id
909 db_track.visibility = track.visibility
910 db_track.resource = track.resource.model_dump()
911 db_track.track_type = track.track_type
912 db_track.images = track.images
913 db_track.meta = track.meta
914 session.commit()
915 return db_track
917 def add_plugin_data(
918 self,
919 track_id: Union[str, uuid.UUID],
920 plugin_name: str,
921 plugin_version: str,
922 key: str,
923 value: Any,
924 user_id: Optional[Union[str, uuid.UUID]] = None,
925 replace: bool = False,
926 ) -> schema.NendoPluginData:
927 """Add plugin data to a NendoTrack and persist changes into the DB.
929 Args:
930 track_id (Union[str, UUID]): ID of the track to which
931 the plugin data should be added.
932 plugin_name (str): Name of the plugin.
933 plugin_version (str): Version of the plugin.
934 key (str): Key under which to save the data.
935 value (Any): Data to save.
936 user_id (Union[str, UUID], optional): ID of user adding the plugin data.
937 replace (bool, optional): Flag that determines whether
938 the last existing data point for the given plugin name and -version
939 is overwritten or not. Defaults to False.
941 Returns:
942 NendoPluginData: The saved plugin data as a NendoPluginData object.
943 """
944 # create plugin data
945 user_id = self._ensure_user_uuid(user_id)
946 value_converted = self._convert_plugin_data(value=value, user_id=user_id)
947 plugin_data = schema.NendoPluginDataCreate(
948 track_id=ensure_uuid(track_id),
949 user_id=ensure_uuid(user_id),
950 plugin_name=plugin_name,
951 plugin_version=plugin_version,
952 key=key,
953 value=value_converted,
954 )
955 with self.session_scope() as session:
956 if replace:
957 existing_plugin_data = self._get_latest_plugin_data_db(
958 track_id=track_id,
959 plugin_name=plugin_name,
960 plugin_version=plugin_version,
961 key=key,
962 session=session,
963 )
965 if existing_plugin_data is not None:
966 db_plugin_data = self._update_plugin_data_db(
967 existing_plugin_data=existing_plugin_data,
968 plugin_data=plugin_data,
969 session=session,
970 )
971 else:
972 db_plugin_data = self._insert_plugin_data_db(
973 plugin_data=plugin_data, session=session,
974 )
975 else:
976 db_plugin_data = self._insert_plugin_data_db(
977 plugin_data=plugin_data, session=session,
978 )
979 return schema.NendoPluginData.model_validate(db_plugin_data)
981 def get_track(self, track_id: uuid.UUID) -> schema.NendoTrack:
982 """Get a single track from the library by ID."""
983 with self.session_scope() as session:
984 track_db = (
985 session.query(model.NendoTrackDB)
986 .filter(model.NendoTrackDB.id == track_id)
987 .one_or_none()
988 )
989 return (
990 schema.NendoTrack.model_validate(track_db) if track_db is not None else None
991 )
993 @schema.NendoPlugin.stream_output
994 def get_tracks(
995 self,
996 query: Optional[Query] = None,
997 user_id: Optional[Union[str, uuid.UUID]] = None,
998 order_by: Optional[str] = None,
999 order: str = "asc",
1000 limit: Optional[int] = None,
1001 offset: Optional[int] = None,
1002 session: Optional[Session] = None,
1003 ) -> Union[List, Iterator]:
1004 """Get tracks based on the given query parameters.
1006 Args:
1007 query (Optional[Query]): Query object to build from.
1008 user_id (Union[str, UUID], optional): ID of user to filter tracks by.
1009 order_by (Optional[str]): Key used for ordering the results.
1010 order (Optional[str]): Order in which to retrieve results ("asc" or "desc").
1011 limit (Optional[int]): Limit the number of returned results.
1012 offset (Optional[int]): Offset into the paginated results (requires limit).
1013 session (sqlalchemy.Session): Session object to commit to.
1015 Returns:
1016 Union[List, Iterator]: List or generator of tracks, depending on the
1017 configuration variable stream_mode
1018 """
1019 user_id = self._ensure_user_uuid(user_id)
1020 s = session or self.session_scope()
1021 with s as session_local:
1022 if query:
1023 query_local = query
1024 else:
1025 query_local = session_local.query(model.NendoTrackDB).filter(
1026 model.NendoTrackDB.user_id == user_id,
1027 )
1028 if order_by:
1029 if order_by == "random":
1030 query_local = query_local.order_by(func.random())
1031 elif order == "desc":
1032 query_local = query_local.order_by(
1033 desc(getattr(model.NendoTrackDB, order_by)),
1034 )
1035 else:
1036 query_local = query_local.order_by(
1037 asc(getattr(model.NendoTrackDB, order_by)),
1038 )
1039 if limit:
1040 query_local = query_local.limit(limit)
1041 if offset:
1042 query_local = query_local.offset(offset)
1044 if self.config.stream_chunk_size > 1:
1045 chunk = []
1046 for track in query_local:
1047 chunk.append(schema.NendoTrack.model_validate(track))
1048 if len(chunk) == self.config.stream_chunk_size:
1049 yield chunk
1050 chunk = []
1051 if chunk: # yield remaining tracks in non-full chunk
1052 yield chunk
1053 else:
1054 for track in query_local:
1055 yield schema.NendoTrack.model_validate(track)
1057 def get_related_tracks(
1058 self,
1059 track_id: Union[str, uuid.UUID],
1060 user_id: Optional[Union[str, uuid.UUID]] = None,
1061 order_by: Optional[str] = None,
1062 order: Optional[str] = "asc",
1063 limit: Optional[int] = None,
1064 offset: Optional[int] = None,
1065 ) -> Union[List, Iterator]:
1066 """Get tracks with a relationship to the track with track_id.
1068 Args:
1069 track_id (str): ID of the track to be searched for.
1070 user_id (Union[str, UUID], optional): The user ID to filter for.
1071 order_by (Optional[str]): Key used for ordering the results.
1072 order (Optional[str]): Order in which to retrieve results ("asc" or "desc").
1073 limit (Optional[int]): Limit the number of returned results.
1074 offset (Optional[int]): Offset into the paginated results (requires limit).
1076 Returns:
1077 Union[List, Iterator]: List or generator of tracks, depending on the
1078 configuration variable stream_mode
1079 """
1080 user_id = self._ensure_user_uuid(user_id)
1081 with self.session_scope() as session:
1082 query = self._get_related_tracks_query(
1083 track_id=ensure_uuid(track_id),
1084 session=session,
1085 user_id=user_id,
1086 )
1087 return self.get_tracks(
1088 query=query,
1089 order_by=order_by,
1090 order=order,
1091 limit=limit,
1092 offset=offset,
1093 session=session,
1094 )
1096 def find_tracks(
1097 self,
1098 value: str,
1099 user_id: Optional[Union[str, uuid.UUID]] = None,
1100 order_by: Optional[str] = None,
1101 order: str = "asc",
1102 limit: Optional[int] = None,
1103 offset: Optional[int] = None,
1104 ) -> Union[List, Iterator]:
1105 """Obtain tracks from the db by fulltext search.
1107 Args:
1108 value (str): The value to search for. The value is matched against
1109 text representations of the track's `meta` and `resource` fields.
1110 user_id (Union[str, UUID], optional): The user ID to filter for.
1111 order_by (Optional[str]): Key used for ordering the results.
1112 limit (Optional[int]): Limit the number of returned results.
1113 offset (Optional[int]): Offset into the paginated results (requires limit).
1115 Returns:
1116 Union[List, Iterator]: List or generator of tracks, depending on the
1117 configuration variable stream_mode
1118 """
1119 user_id = self._ensure_user_uuid(user_id)
1120 with self.session_scope() as session:
1121 query = session.query(model.NendoTrackDB).filter(
1122 and_(
1123 or_(
1124 cast(model.NendoTrackDB.resource, Text()).ilike(
1125 "%{}%".format(value),
1126 ),
1127 cast(model.NendoTrackDB.meta, Text()).ilike(
1128 "%{}%".format(value),
1129 ),
1130 ),
1131 model.NendoTrackDB.user_id == user_id,
1132 ),
1133 )
1134 return self.get_tracks(
1135 query=query,
1136 order_by=order_by,
1137 order=order,
1138 limit=limit,
1139 offset=offset,
1140 session=session,
1141 )
1143 def filter_tracks(
1144 self,
1145 filters: Optional[Dict[str, Any]] = None,
1146 resource_filters: Optional[Dict[str, Any]] = None,
1147 track_type: Optional[Union[str, List[str]]] = None,
1148 user_id: Optional[Union[str, uuid.UUID]] = None,
1149 collection_id: Optional[Union[str, uuid.UUID]] = None,
1150 plugin_names: Optional[List[str]] = None,
1151 order_by: Optional[str] = None,
1152 order: str = "asc",
1153 limit: Optional[int] = None,
1154 offset: Optional[int] = None,
1155 session: Optional[Session] = None,
1156 ) -> Union[List, Iterator]:
1157 """Obtain tracks from the db by filtering over plugin data.
1159 Args:
1160 filters (Optional[dict]): Dictionary containing the filters to apply.
1161 Defaults to None.
1162 resource_filters (dict): Dictionary containing the keywords to search for
1163 over the track.resource.meta field. The dictionary's values
1164 should contain singular search tokens and the keys currently have no
1165 effect but might in the future. Defaults to {}.
1166 track_type (Union[str, List[str]], optional): Track type to filter for.
1167 Can be a singular type or a list of types. Defaults to None.
1168 user_id (Union[str, UUID], optional): The user ID to filter for.
1169 collection_id (Union[str, uuid.UUID], optional): Collection id to
1170 which the filtered tracks must have a relationship. Defaults to None.
1171 plugin_names (list, optional): List used for applying the filter only to
1172 data of certain plugins. If None, all plugin data related to the track
1173 is used for filtering.
1174 order_by (str, optional): Key used for ordering the results.
1175 order (str, optional): Ordering ("asc" vs "desc"). Defaults to "asc".
1176 limit (int, optional): Limit the number of returned results.
1177 offset (int, optional): Offset into the paginated results (requires limit).
1179 Returns:
1180 Union[List, Iterator]: List or generator of tracks, depending on the
1181 configuration variable stream_mode
1182 """
1183 user_id = self._ensure_user_uuid(user_id)
1184 s = session or self.session_scope()
1185 with s as session_local:
1186 """Obtain tracks from the db by filtering w.r.t. various fields."""
1187 query = session_local.query(model.NendoTrackDB).filter(
1188 model.NendoTrackDB.user_id == user_id,
1189 )
1190 plugin_name_condition = true()
1191 if (
1192 plugin_names is not None
1193 and isinstance(plugin_names, list)
1194 and len(plugin_names) > 0
1195 ):
1196 plugin_name_condition = model.NendoPluginDataDB.plugin_name.in_(
1197 plugin_names,
1198 )
1200 # apply track type filter if applicable
1201 if track_type is not None:
1202 if isinstance(track_type, list):
1203 query = query.filter(model.NendoTrackDB.track_type.in_(track_type))
1204 else:
1205 query = query.filter(model.NendoTrackDB.track_type == track_type)
1207 # apply collection filter if applicable
1208 if collection_id is not None:
1209 collection_id = ensure_uuid(collection_id)
1211 query = query.join(
1212 model.TrackCollectionRelationshipDB,
1213 model.NendoTrackDB.id
1214 == model.TrackCollectionRelationshipDB.source_id,
1215 ).filter(
1216 model.TrackCollectionRelationshipDB.target_id == collection_id,
1217 )
1219 # apply resource filters if applicable
1220 if resource_filters:
1221 values = [v for v in resource_filters.values() if isinstance(v, list)]
1222 or_filter = or_(
1223 *(
1224 cast(model.NendoTrackDB.resource, Text()).ilike(
1225 "%{}%".format(str(value)),
1226 )
1227 for rf in values
1228 for value in rf
1229 ),
1230 )
1231 query = query.filter(or_filter)
1233 # apply plugin data filters
1234 if filters is not None:
1235 for k, v in filters.items():
1236 if v is None:
1237 continue
1239 # range
1240 if type(v) == tuple:
1241 query = query.filter(
1242 model.NendoTrackDB.plugin_data.any(
1243 and_(
1244 plugin_name_condition,
1245 model.NendoPluginDataDB.key == k,
1246 cast(model.NendoPluginDataDB.value, Float)
1247 >= cast(v[0], Float),
1248 cast(model.NendoPluginDataDB.value, Float)
1249 <= cast(v[1], Float),
1250 ),
1251 ),
1252 )
1253 # multiselect
1254 elif isinstance(v, list):
1255 sv = [str(vi) for vi in v]
1256 query = query.filter(
1257 model.NendoTrackDB.plugin_data.any(
1258 and_(
1259 plugin_name_condition,
1260 model.NendoPluginDataDB.key == k,
1261 model.NendoPluginDataDB.value.in_(sv),
1262 ),
1263 ),
1264 )
1265 # fuzzy match
1266 else:
1267 query = query.filter(
1268 model.NendoTrackDB.plugin_data.any(
1269 and_(
1270 plugin_name_condition,
1271 model.NendoPluginDataDB.key == k,
1272 cast(model.NendoPluginDataDB.value, Text()).ilike(
1273 "%{}%".format(str(v)),
1274 ),
1275 ),
1276 ),
1277 )
1279 return self.get_tracks(
1280 query=query,
1281 order_by=order_by,
1282 order=order,
1283 limit=limit,
1284 offset=offset,
1285 session=session,
1286 )
1288 def remove_track(
1289 self,
1290 track_id: Union[str, uuid.UUID],
1291 remove_relationships: bool = False,
1292 remove_plugin_data: bool = True,
1293 remove_resources: bool = True,
1294 user_id: Optional[Union[str, uuid.UUID]] = None,
1295 ) -> bool:
1296 """Delete track from library by ID.
1298 Args:
1299 track_id (Union[str, uuid.UUID]): The ID of the track to remove
1300 remove_relationships (bool):
1301 If False prevent deletion if related tracks exist,
1302 if True delete relationships together with the object
1303 remove_plugin_data (bool):
1304 If False prevent deletion if related plugin data exist
1305 if True delete plugin data together with the object
1306 remove_resources (bool):
1307 If False, keep the related resources, e.g. files
1308 if True, delete the related resources
1309 user_id (Union[str, UUID], optional): The ID of the user owning the track.
1311 Returns:
1312 success (bool): True if removal was successful, False otherwise.
1313 """
1314 user_id = self._ensure_user_uuid(user_id)
1315 track_id = ensure_uuid(track_id)
1316 with self.session_scope() as session:
1317 tracks_with_relations = self._get_related_tracks_query(
1318 track_id=track_id, session=session, user_id=user_id,
1319 ).all()
1320 collections_with_relations = (
1321 session.query(model.NendoCollectionDB)
1322 .join(
1323 model.TrackCollectionRelationshipDB,
1324 model.NendoCollectionDB.id
1325 == model.TrackCollectionRelationshipDB.target_id,
1326 )
1327 .filter(model.TrackCollectionRelationshipDB.source_id == track_id)
1328 .all()
1329 )
1330 related_plugin_data = self._get_all_plugin_data_db(
1331 track_id=track_id, session=session,
1332 )
1333 if len(related_plugin_data) > 0:
1334 if remove_plugin_data:
1335 logger.info("Removing %d plugin data", len(related_plugin_data))
1336 session.query(model.NendoPluginDataDB).filter(
1337 model.NendoPluginDataDB.track_id == track_id,
1338 ).delete()
1339 session.commit()
1340 else:
1341 logger.warning(
1342 "Cannot remove due to %d existing "
1343 "plugin data entries. Set `remove_plugin_data=True` "
1344 "to remove them.", len(related_plugin_data),
1345 )
1346 return False
1347 n_rel = len(tracks_with_relations) + len(collections_with_relations)
1348 if n_rel > 0:
1349 if remove_relationships:
1350 session.query(model.TrackTrackRelationshipDB).filter(
1351 model.TrackTrackRelationshipDB.target_id == track_id,
1352 ).delete()
1353 session.query(model.TrackTrackRelationshipDB).filter(
1354 model.TrackTrackRelationshipDB.source_id == track_id,
1355 ).delete()
1356 # remove track from all collections
1357 for collection in collections_with_relations:
1358 self._remove_track_from_collection_db(
1359 track_id=track_id,
1360 collection_id=collection.id,
1361 session=session,
1362 )
1363 session.commit()
1364 else:
1365 logger.warning(
1366 "Cannot remove due to %s existing relationships. "
1367 "Set `remove_relationships=True` to remove them.",
1368 n_rel,
1369 )
1370 return False
1371 # delete actual target
1372 target = (
1373 session.query(model.NendoTrackDB)
1374 .filter(model.NendoTrackDB.id == track_id)
1375 .first()
1376 )
1377 session.delete(target)
1378 # only delete if file has been copied to the library
1379 # ("original_filepath" is present)
1380 if (
1381 remove_resources
1382 and "original_filepath"
1383 in schema.NendoTrack.model_validate(target).resource.meta
1384 ):
1385 logger.info("Removing resources associated with Track %s", str(track_id))
1386 return self.storage_driver.remove_file(
1387 file_name=target.resource["file_name"],
1388 user_id=str(user_id) if user_id else str(self.user.id),
1389 )
1390 return True
1392 def export_track(
1393 self,
1394 track_id: Union[str, uuid.UUID],
1395 file_path: str,
1396 file_format: str = "wav",
1397 ) -> str:
1398 """Export the track to a file.
1400 Args:
1401 track_id (Union[str, uuid.UUID]): The ID of the target track to export.
1402 file_path (str): Path to the exported file. Can be either a full
1403 file path or a directory path. If a directory path is given,
1404 a filename will be automatically generated and the file will be
1405 exported to the format specified as file_format. If a full file
1406 path is given, the format will be deduced from the path and the
1407 file_format parameter will be ignored.
1408 file_format (str, optional): Format of the exported track. Ignored if
1409 file_path is a full file path. Defaults to "wav".
1411 Returns:
1412 str: The path to the exported file.
1413 """
1414 track = self.get_track(track_id=track_id)
1415 # Check if file_path is a directory
1416 if os.path.isdir(file_path):
1417 # Generate a filename with timestamp
1418 if track.has_meta("original_filename"):
1419 original_filename = track.get_meta("original_filename")
1420 else:
1421 original_filename = track.resource.file_name
1422 file_name = (
1423 f"{original_filename}_nendo_"
1424 f"{datetime.now().strftime('%Y%m%d%H%M%S')}" # noqa: DTZ005
1425 f".{file_format}"
1426 )
1427 file_path = os.path.join(file_path, file_name)
1428 else:
1429 # Deduce file format from file extension
1430 file_format = os.path.splitext(file_path)[1].lstrip(".")
1432 # Exporting the audio
1433 temp_path = None
1434 signal = track.signal
1435 signal = np.transpose(signal) if signal.shape[0] <= 2 else signal
1436 if file_format in ("wav", "ogg"):
1437 sf.write(file_path, signal, track.sr, format=file_format)
1438 elif file_format == "mp3":
1439 # Create a temporary WAV file for conversion
1440 temp_path = file_path.rsplit(".", 1)[0] + ".wav"
1441 sf.write(temp_path, signal, track.sr)
1442 subprocess.run(
1443 ["ffmpeg", "-i", temp_path, "-acodec", "libmp3lame", file_path],
1444 )
1445 else:
1446 raise ValueError(
1447 "Unsupported file format. Supported formats are 'wav', 'mp3', and 'ogg'.",
1448 )
1450 # Clean up temporary file if used
1451 if temp_path and os.path.exists(temp_path):
1452 os.remove(temp_path)
1454 return file_path
1456 # ===============================
1457 #
1458 # COLLECTION MANAGEMENT FUNCTIONS
1459 #
1460 # ===============================
1462 def _get_related_collections_query(
1463 self,
1464 collection_id: uuid.UUID,
1465 session: Session,
1466 user_id: Optional[uuid.UUID] = None,
1467 ) -> Query:
1468 """Create a query for the collections related to a given collection.
1470 Args:
1471 collection_id (UUID): ID of the collection to be searched for.
1472 session (Session): Session to be used for the transaction.
1473 user_id (UUID, optional): The user ID to filter for.
1475 Returns:
1476 Query: The SQLAlchemy query object.
1477 """
1478 user_id = user_id or self.user.id
1479 return (
1480 session.query(model.NendoCollectionDB)
1481 .filter(model.NendoCollectionDB.user_id == user_id)
1482 .options(
1483 joinedload(model.NendoCollectionDB.related_tracks).joinedload(
1484 model.TrackCollectionRelationshipDB.source,
1485 ),
1486 )
1487 .join(
1488 model.CollectionCollectionRelationshipDB,
1489 model.NendoCollectionDB.id
1490 == model.CollectionCollectionRelationshipDB.source_id,
1491 )
1492 .filter(
1493 model.CollectionCollectionRelationshipDB.target_id == collection_id,
1494 )
1495 )
1497 def _remove_track_from_collection_db(
1498 self,
1499 track_id: uuid.UUID,
1500 collection_id: uuid.UUID,
1501 session: Session,
1502 ) -> bool:
1503 """Deletes a relationship from the track to the collection in the db.
1505 Args:
1506 collection_id (uuid.UUID): Collection id.
1507 track_id (uuid.UUID): Track id.
1508 session (sqlalchemy.Session): Session object
1510 Returns:
1511 success (bool): True if removal was successful, False otherwise.
1512 """
1513 target_relationship = (
1514 session.query(model.TrackCollectionRelationshipDB)
1515 .filter(
1516 and_(
1517 model.TrackCollectionRelationshipDB.source_id == track_id,
1518 model.TrackCollectionRelationshipDB.target_id == collection_id,
1519 ),
1520 )
1521 .first()
1522 )
1523 if target_relationship is None:
1524 raise schema.NendoRelationshipNotFoundError(
1525 "Relationship not found", track_id, collection_id,
1526 )
1527 session.delete(target_relationship)
1528 # Adjust positions of other states
1529 rel_db = model.TrackCollectionRelationshipDB
1530 session.query(model.TrackCollectionRelationshipDB).filter(
1531 rel_db.target_id == collection_id,
1532 rel_db.relationship_position
1533 > target_relationship.relationship_position,
1534 ).update({rel_db.relationship_position: rel_db.relationship_position - 1})
1536 def add_collection(
1537 self,
1538 name: str,
1539 user_id: Optional[Union[str, uuid.UUID]] = None,
1540 track_ids: Optional[List[Union[str, uuid.UUID]]] = None,
1541 description: str = "",
1542 collection_type: str = "collection",
1543 visibility: schema.Visibility = schema.Visibility.private,
1544 meta: Optional[Dict[str, Any]] = None,
1545 ) -> schema.NendoCollection:
1546 """Creates a new collection and saves it into the DB.
1548 Args:
1549 track_ids (List[Union[str, uuid.UUID]]): List of track ids
1550 to be added to the collection.
1551 name (str): Name of the collection.
1552 user_id (UUID, optional): The ID of the user adding the collection.
1553 description (str): Description of the collection.
1554 collection_type (str): Type of the collection.
1555 meta (Dict[str, Any]): Metadata of the collection.
1557 Returns:
1558 schema.NendoCollection: The newly created NendoCollection object.
1559 """
1560 user_id = self._ensure_user_uuid(user_id)
1561 if track_ids is None:
1562 track_ids = []
1563 with self.session_scope() as session:
1564 # Fetch the track objects
1565 track_objs = (
1566 session.query(model.NendoTrackDB)
1567 .filter(
1568 and_(
1569 model.NendoTrackDB.id.in_(
1570 [
1571 uuid.UUID(t) if isinstance(t, str) else t
1572 for t in track_ids
1573 ],
1574 ),
1575 model.NendoTrackDB.user_id == user_id,
1576 ),
1577 )
1578 .all()
1579 )
1581 # Create a new collection object
1582 new_collection = model.NendoCollectionDB(
1583 name=name,
1584 user_id=user_id,
1585 description=description,
1586 collection_type=collection_type,
1587 visibility=visibility,
1588 meta=meta or {},
1589 )
1590 session.add(new_collection)
1591 session.commit()
1592 session.refresh(new_collection)
1593 # Create relationships from tracks to collection
1594 for idx, track in enumerate(track_objs):
1595 tc_relationship = model.TrackCollectionRelationshipDB(
1596 source_id=track.id,
1597 target_id=new_collection.id,
1598 relationship_type="track",
1599 meta={},
1600 relationship_position=idx,
1601 )
1602 session.add(tc_relationship)
1603 session.commit()
1604 session.refresh(new_collection)
1606 return schema.NendoCollection.model_validate(new_collection)
1608 def add_related_collection(
1609 self,
1610 track_ids: List[Union[str, uuid.UUID]],
1611 collection_id: Union[str, uuid.UUID],
1612 name: str,
1613 description: str = "",
1614 user_id: Optional[Union[str, uuid.UUID]] = None,
1615 relationship_type: str = "relationship",
1616 meta: Optional[Dict[str, Any]] = None,
1617 ) -> schema.NendoCollection:
1618 """Adds a new collection with a relationship to the given collection.
1620 Args:
1621 track_ids (List[Union[str, uuid.UUID]]): List of track ids.
1622 collection_id (Union[str, uuid.UUID]): Existing collection id.
1623 name (str): Name of the new related collection.
1624 description (str): Description of the new related collection.
1625 user_id (UUID, optional): The ID of the user adding the collection.
1626 relationship_type (str): Type of the relationship.
1627 meta (Dict[str, Any]): Meta of the new related collection.
1629 Returns:
1630 schema.NendoCollection: The newly added NendoCollection object.
1631 """
1632 user_id = self._ensure_user_uuid(user_id)
1633 # Create a new collection
1634 new_collection = self.add_collection(
1635 name=name,
1636 user_id=user_id,
1637 track_ids=track_ids,
1638 description=description,
1639 collection_type=relationship_type,
1640 meta=meta,
1641 )
1643 with self.session_scope() as session:
1644 if isinstance(collection_id, str):
1645 collection_id = uuid.UUID(collection_id)
1646 collection = (
1647 session.query(model.NendoCollectionDB)
1648 .filter_by(id=collection_id)
1649 .first()
1650 )
1652 # Check if the collection does not exist
1653 if not collection:
1654 raise schema.NendoCollectionNotFoundError(
1655 "Collection not found", id=collection_id,
1656 )
1658 # create bidirectional relationship
1659 relationship_from = schema.NendoRelationshipCreate(
1660 source_id=new_collection.id,
1661 target_id=collection_id,
1662 relationship_type=relationship_type,
1663 meta=meta or {},
1664 )
1665 relationship_to = schema.NendoRelationshipCreate(
1666 source_id=collection_id,
1667 target_id=new_collection.id,
1668 relationship_type=relationship_type,
1669 meta=meta or {},
1670 )
1671 relationship_from = model.CollectionCollectionRelationshipDB(
1672 **relationship_from.model_dump(),
1673 )
1674 relationship_to = model.CollectionCollectionRelationshipDB(
1675 **relationship_to.model_dump(),
1676 )
1677 session.add(relationship_from)
1678 session.add(relationship_to)
1680 new_collection.related_collections.append(relationship_from)
1681 return new_collection
1683 def add_track_to_collection(
1684 self,
1685 track_id: Union[str, uuid.UUID],
1686 collection_id: Union[str, uuid.UUID],
1687 position: Optional[int] = None,
1688 meta: Optional[Dict[str, Any]] = None,
1689 ) -> schema.NendoCollection:
1690 """Creates a relationship from the track to the collection.
1692 Args:
1693 track_id (Union[str, uuid.UUID]): ID of the track to add.
1694 collection_id (Union[str, uuid.UUID]): ID of the collection to
1695 which to add the track.
1696 position (int, optional): Target position of the track inside
1697 the collection.
1698 meta (Dict[str, Any]): Metadata of the relationship.
1700 Returns:
1701 schema.NendoCollection: The updated NendoCollection object.
1702 """
1703 with self.session_scope() as session:
1704 # Convert IDs to UUIDs if they're strings
1705 if isinstance(collection_id, str):
1706 collection_id = uuid.UUID(collection_id)
1707 if isinstance(track_id, str):
1708 track_id = uuid.UUID(track_id)
1710 # Check the collection and track objects
1711 collection = (
1712 session.query(model.NendoCollectionDB)
1713 .filter_by(id=collection_id)
1714 .first()
1715 )
1716 track = session.query(model.NendoTrackDB).filter_by(id=track_id).first()
1717 if not collection:
1718 raise schema.NendoCollectionNotFoundError(
1719 "The collection does not exist", collection_id,
1720 )
1721 if not track:
1722 raise schema.NendoCollectionNotFoundError(
1723 "The track does not exist", track_id,
1724 )
1726 rc_rel_db = model.TrackCollectionRelationshipDB
1727 if position is not None:
1728 # Update other states to keep ordering consistent
1729 session.query(rc_rel_db).filter(
1730 rc_rel_db.target_id == collection_id,
1731 rc_rel_db.relationship_position >= position,
1732 ).update(
1733 {
1734 rc_rel_db.relationship_position: rc_rel_db.relationship_position
1735 + 1,
1736 },
1737 )
1738 else:
1739 # If no position specified, add at the end
1740 last_state = (
1741 session.query(model.TrackCollectionRelationshipDB)
1742 .filter_by(target_id=collection_id)
1743 .order_by(
1744 model.TrackCollectionRelationshipDB.relationship_position.desc(),
1745 )
1746 .first()
1747 )
1748 position = last_state.relationship_position + 1 if last_state else 0
1750 # Create a relationship from the track to the collection
1751 tc_relationship = model.TrackCollectionRelationshipDB(
1752 source_id=track_id,
1753 target_id=collection_id,
1754 relationship_type="track",
1755 meta=meta or {},
1756 relationship_position=position,
1757 )
1758 session.add(tc_relationship)
1759 session.commit()
1760 session.refresh(collection)
1761 return schema.NendoCollection.model_validate(collection)
1763 def get_collection_tracks(
1764 self, collection_id: uuid.UUID,
1765 ) -> List[schema.NendoTrack]:
1766 """Get all tracks from a collection.
1768 Args:
1769 collection_id (uuid.UUID): ID of the collection to get the tracks from.
1771 Returns:
1772 List[schema.NendoTrack]: List of tracks in the collection.
1773 """
1774 with self.session_scope() as session:
1775 tracks_db = (
1776 session.query(model.NendoTrackDB)
1777 .join(
1778 model.TrackCollectionRelationshipDB,
1779 model.TrackCollectionRelationshipDB.source_id
1780 == model.NendoTrackDB.id,
1781 )
1782 .options(joinedload(model.NendoTrackDB.related_collections))
1783 .filter(model.TrackCollectionRelationshipDB.target_id == collection_id)
1784 .all()
1785 )
1787 return (
1788 [schema.NendoTrack.model_validate(t) for t in tracks_db]
1789 if tracks_db is not None
1790 else None
1791 )
1793 def get_collection(
1794 self, collection_id: uuid.UUID, details: bool = True,
1795 ) -> Union[schema.NendoCollection, schema.NendoCollectionSlim]:
1796 """Get a collection by its ID.
1798 Args:
1799 collection_id (uuid.UUID): ID of the target collection.
1800 details (bool, optional): Flag that defines whether the result should
1801 contain all fields or only a subset. Defaults to True.
1803 Returns:
1804 Union[NendoCollection, NendoCollectionSlim]: Collection object, compact
1805 version if the `details` flag has been set to False.
1806 """
1807 with self.session_scope() as session:
1808 query = session.query(model.NendoCollectionDB)
1809 if details:
1810 query = query.options(
1811 joinedload(model.NendoCollectionDB.related_tracks).joinedload(
1812 model.TrackCollectionRelationshipDB.source,
1813 ),
1814 )
1815 collection_db = query.filter(
1816 model.NendoCollectionDB.id == collection_id,
1817 ).first()
1819 if details:
1820 collection = (
1821 schema.NendoCollection.model_validate(collection_db)
1822 if collection_db is not None
1823 else None
1824 )
1825 else:
1826 collection = (
1827 schema.NendoCollectionSlim.model_validate(collection_db)
1828 if collection_db is not None
1829 else None
1830 )
1831 return collection
1833 def get_collections(
1834 self,
1835 query: Optional[Query] = None,
1836 user_id: Optional[Union[str, uuid.UUID]] = None,
1837 order_by: Optional[str] = None,
1838 order: Optional[str] = "asc",
1839 limit: Optional[int] = None,
1840 offset: Optional[int] = None,
1841 session: Optional[Session] = None,
1842 ) -> Union[List, Iterator]:
1843 """Get a list of collections.
1845 Args:
1846 query (Optional[Query]): Query object to build from.
1847 user_id (Union[str, UUID], optional): The user ID to filter for.
1848 order_by (Optional[str]): Key used for ordering the results.
1849 order (Optional[str]): Order in which to retrieve results
1850 ("asc" or "desc").
1851 limit (Optional[int]): Limit the number of returned results.
1852 offset (Optional[int]): Offset into the paginated results (requires limit).
1853 session (sqlalchemy.Session): Session object to commit to.
1855 Returns:
1856 Union[List, Iterator]: List or generator of collections, depending on the
1857 configuration variable stream_mode
1858 """
1859 user_id = self._ensure_user_uuid(user_id)
1860 with self.session_scope() as session:
1861 query = session.query(model.NendoCollectionDB).filter(
1862 model.NendoCollectionDB.user_id == user_id,
1863 )
1864 return self._get_collections_db(
1865 query, user_id, order_by, order, limit, offset, session,
1866 )
1868 @schema.NendoPlugin.stream_output
1869 def _get_collections_db(
1870 self,
1871 query: Optional[Query] = None,
1872 user_id: Optional[Union[str, uuid.UUID]] = None,
1873 order_by: Optional[str] = None,
1874 order: Optional[str] = "asc",
1875 limit: Optional[int] = None,
1876 offset: Optional[int] = None,
1877 session: Optional[Session] = None,
1878 ) -> Union[List, Iterator]:
1879 """Get a list of collections from the DB."""
1880 user_id = self._ensure_user_uuid(user_id)
1881 s = session or self.session_scope()
1882 with s as session_local:
1883 if query:
1884 query_local = query
1885 else:
1886 query_local = session_local.query(model.NendoCollectionDB).filter(
1887 model.NendoCollectionDB.user_id == user_id,
1888 )
1890 if order_by:
1891 if order_by == "random":
1892 query_local = query_local.order_by(func.random())
1893 elif order == "desc":
1894 query_local = query_local.order_by(desc(order_by))
1895 else:
1896 query_local = query_local.order_by(asc(order_by))
1898 if limit:
1899 query_local = query_local.limit(limit)
1900 if offset:
1901 query_local = query_local.offset(offset)
1903 if self.config.stream_chunk_size > 1:
1904 chunk = []
1905 for collection in query:
1906 chunk.append(schema.NendoCollection.model_validate(collection))
1907 if len(chunk) == self.config.stream_chunk_size:
1908 yield chunk
1909 chunk = []
1910 if chunk: # yield remaining tracks in non-full chunk
1911 yield chunk
1912 else:
1913 for collection in query:
1914 yield schema.NendoCollection.model_validate(collection)
1916 def find_collections(
1917 self,
1918 value: str = "",
1919 user_id: Optional[Union[str, uuid.UUID]] = None,
1920 order_by: Optional[str] = None,
1921 order: Optional[str] = "asc",
1922 limit: Optional[int] = None,
1923 offset: Optional[int] = None,
1924 ) -> Union[List, Iterator]:
1925 """Find collections with a search term in the description or meta field.
1927 Args:
1928 value (str): Term to be searched for in the description and meta field.
1929 user_id (Union[str, UUID], optional): The user ID to filter for.
1930 order_by (Optional[str]): Key used for ordering the results.
1931 order (Optional[str]): Order in which to retrieve results ("asc" or "desc").
1932 limit (Optional[int]): Limit the number of returned results.
1933 offset (Optional[int]): Offset into the paginated results (requires limit).
1935 Returns:
1936 Union[List, Iterator]: List or generator of collections, depending on the
1937 configuration variable stream_mode
1938 """
1939 user_id = self._ensure_user_uuid(user_id)
1940 with self.session_scope() as session:
1941 query = session.query(model.NendoCollectionDB).filter(
1942 and_(
1943 or_(
1944 model.NendoCollectionDB.name.ilike(f"%{value}%"),
1945 model.NendoCollectionDB.description.ilike(f"%{value}%"),
1946 # cast(
1947 # model.NendoCollectionDB.meta, Text()).ilike(f"%{value}%"
1948 # ),
1949 ),
1950 model.NendoCollectionDB.user_id == user_id,
1951 )
1952 )
1953 return self._get_collections_db(
1954 query, user_id, order_by, order, limit, offset, session,
1955 )
1957 def get_related_collections(
1958 self,
1959 collection_id: Union[str, uuid.UUID],
1960 user_id: Optional[Union[str, uuid.UUID]] = None,
1961 order_by: Optional[str] = None,
1962 order: Optional[str] = "asc",
1963 limit: Optional[int] = None,
1964 offset: Optional[int] = None,
1965 ) -> Union[List, Iterator]:
1966 """Get collections with a relationship to the collection with collection_id.
1968 Args:
1969 collection_id (str): ID of the collection to be searched for.
1970 user_id (Union[str, UUID], optional): The user ID to filter for.
1971 order_by (Optional[str]): Key used for ordering the results.
1972 order (Optional[str]): Order in which to retrieve results ("asc" or "desc").
1973 limit (Optional[int]): Limit the number of returned results.
1974 offset (Optional[int]): Offset into the paginated results (requires limit).
1976 Returns:
1977 Union[List, Iterator]: List or generator of collections, depending on the
1978 configuration variable stream_mode
1979 """
1980 user_id = self._ensure_user_uuid(user_id)
1981 with self.session_scope() as session:
1982 query = self._get_related_collections_query(
1983 collection_id=ensure_uuid(collection_id),
1984 session=session,
1985 user_id=user_id,
1986 )
1987 return self._get_collections_db(query, order_by, order, limit, offset, session)
1989 def remove_track_from_collection(
1990 self,
1991 track_id: Union[str, uuid.UUID],
1992 collection_id: Union[str, uuid.UUID],
1993 ) -> bool:
1994 """Deletes a relationship from the track to the collection.
1996 Args:
1997 track_id (Union[str, uuid.UUID]): ID
1998 collection_id (Union[str, uuid.UUID]): Collection id.
2000 Returns:
2001 success (bool): True if removal was successful, False otherwise.
2002 """
2003 with self.session_scope() as session:
2004 collection_id = ensure_uuid(collection_id)
2005 track_id = ensure_uuid(track_id)
2007 # # Check the collection and track objects
2008 # collection = (
2009 # session.query(model.NendoCollectionDB)
2010 # .filter_by(id=collection_id)
2011 # .first()
2012 # )
2013 # track = session.query(model.NendoTrackDB).filter_by(id=track_id).first()
2014 # if not collection:
2015 # raise ValueError("The collection does not exist")
2016 # if not track:
2017 # raise ValueError("The track does not exist")
2019 # Remove the relationship from the track to the collection
2020 return self._remove_track_from_collection_db(
2021 track_id=track_id, collection_id=collection_id, session=session,
2022 )
2024 def update_collection(
2025 self,
2026 collection: schema.NendoCollection,
2027 ) -> schema.NendoCollection:
2028 """Updates the given collection by storing it to the database.
2030 Args:
2031 collection (NendoCollection): The collection to store.
2033 Raises:
2034 NendoCollectionNotFoundError: If the collection with
2035 the given ID was not found.
2037 Returns:
2038 NendoCollection: The updated collection.
2039 """
2040 with self.session_scope() as session:
2041 collection_db = (
2042 session.query(model.NendoCollectionDB)
2043 .filter_by(id=collection.id)
2044 .first()
2045 )
2047 if collection_db is None:
2048 raise schema.NendoCollectionNotFoundError(
2049 "Collection not found", collection.id,
2050 )
2052 collection_db.name = collection.name
2053 collection_db.user_id = collection.user_id
2054 collection_db.description = collection.description
2055 collection_db.collection_type = collection.collection_type
2056 collection_db.visibility = collection.visibility
2057 collection_db.meta = collection.meta
2059 session.commit()
2060 session.refresh(collection_db)
2061 return schema.NendoCollection.model_validate(collection_db)
2063 def remove_collection(
2064 self,
2065 collection_id: uuid.UUID,
2066 remove_relationships: bool = False,
2067 ) -> bool:
2068 """Deletes the collection identified by `collection_id`.
2070 Args:
2071 collection_id (uuid.UUID): ID of the collection to remove.
2072 remove_relationships (bool, optional):
2073 If False prevent deletion if related tracks exist,
2074 if True delete relationships together with the object.
2075 Defaults to False.
2077 Returns:
2078 bool: True if deletion was successful, False otherwise.
2079 """
2080 with self.session_scope() as session:
2081 # has_related_track = (
2082 # session.query(model.TrackCollectionRelationshipDB)
2083 # .filter(
2084 # model.TrackCollectionRelationshipDB.target_id
2085 # == collection_id
2086 # )
2087 # .first()
2088 # ) is not False
2089 has_related_collection = (
2090 session.query(model.CollectionCollectionRelationshipDB)
2091 .filter(
2092 or_(
2093 model.CollectionCollectionRelationshipDB.source_id
2094 == collection_id,
2095 model.CollectionCollectionRelationshipDB.target_id
2096 == collection_id,
2097 ),
2098 )
2099 .first()
2100 ) is not None
2102 if has_related_collection: # or has_related_track:
2103 if remove_relationships:
2104 session.query(model.CollectionCollectionRelationshipDB).filter(
2105 or_(
2106 model.CollectionCollectionRelationshipDB.target_id
2107 == collection_id,
2108 model.CollectionCollectionRelationshipDB.source_id
2109 == collection_id,
2110 ),
2111 ).delete()
2112 session.commit()
2113 else:
2114 logger.warning(
2115 "Cannot remove due to existing relationships. "
2116 "Set `remove_relationships=True` to remove them.",
2117 )
2118 return False
2120 # clean up track-collection relationships
2121 session.query(model.TrackCollectionRelationshipDB).filter(
2122 model.TrackCollectionRelationshipDB.target_id == collection_id,
2123 ).delete()
2124 session.commit()
2126 # delete actual target
2127 session.query(model.NendoCollectionDB).filter(
2128 model.NendoCollectionDB.id == collection_id,
2129 ).delete()
2131 return True
2133 def export_collection(
2134 self,
2135 collection_id: Union[str, uuid.UUID],
2136 export_path: str,
2137 filename_suffix: str = "nendo",
2138 file_format: str = "wav",
2139 ) -> List[str]:
2140 """Export the collection to a directory.
2142 Args:
2143 collection_id (Union[str, uuid.UUID]): The ID of the target
2144 collection to export.
2145 export_path (str): Path to a directory into which the collection's tracks
2146 should be exported.
2147 filename_suffix (str): The suffix which should be appended to each
2148 exported track's filename.
2149 file_format (str, optional): Format of the exported track. Ignored if
2150 file_path is a full file path. Defaults to "wav".
2152 Returns:
2153 List[str]: A list with all full paths to the exported files.
2154 """
2155 collection_tracks = self.get_collection_tracks(collection_id)
2156 now = datetime.now().strftime("%Y%m%d%H%M%S") # noqa: DTZ005
2157 if not os.path.isdir(export_path):
2158 logger.info(
2159 f"Export path {export_path} does not exist, creating now.",
2160 )
2161 os.makedirs(export_path, exist_ok=True)
2162 track_file_paths = []
2163 for track in collection_tracks:
2164 if track.has_meta("original_filename"):
2165 original_filename = track.get_meta("original_filename")
2166 else:
2167 original_filename = track.resource.file_name
2168 file_name = f"{original_filename}_{filename_suffix}_{now}.{file_format}"
2169 file_path = os.path.join(export_path, file_name)
2170 track_file_path = self.export_track(
2171 track_id = track.id,
2172 file_path = file_path,
2173 file_format = file_format,
2174 )
2175 track_file_paths.append(track_file_path)
2176 return track_file_paths
2178 # =========================
2179 #
2180 # BLOB MANAGEMENT FUNCTIONS
2181 #
2182 # =========================
2184 def _upsert_blob_db(
2185 self, blob: schema.NendoBlobBase, session: Session,
2186 ) -> model.NendoBlobDB:
2187 """Create blob in DB or update if it exists.
2189 Args:
2190 blob (schema.NendoBlobCreate): Blob object to be created
2191 session (Session): Session to be used for the transaction
2193 Returns:
2194 model.NendoBlobDB: The ORM model object of the upserted blob
2195 """
2196 if type(blob) == schema.NendoBlobCreate:
2197 # create new blob
2198 db_blob = model.NendoBlobDB(**blob.model_dump())
2199 session.add(db_blob)
2200 else:
2201 # update existing blob
2202 db_blob = (
2203 session.query(model.NendoBlobDB).filter_by(id=blob.id).one_or_none()
2204 )
2205 if db_blob is None:
2206 raise schema.NendoBlobNotFoundError("Blob not found", target_id=blob.id)
2207 db_blob.resource = blob.resource.model_dump()
2208 session.commit()
2209 return db_blob
2211 def _create_blob_from_bytes(
2212 self,
2213 data: bytes,
2214 user_id: Optional[uuid.UUID] = None,
2215 ) -> schema.NendoBlobCreate:
2216 """Create a blob from the given bytes."""
2217 target_file = None
2218 try:
2219 target_file = self.storage_driver.save_bytes(
2220 file_name=self.storage_driver.generate_filename(
2221 filetype="pkl",
2222 user_id=str(user_id) if user_id else str(self.user.id),
2223 ),
2224 data=data,
2225 user_id=str(user_id) if user_id else str(self.user.id),
2226 )
2227 except Exception as e: # noqa: BLE001
2228 raise schema.NendoLibraryError(
2229 f"Failed writing file {target_file} to the library. Error: {e}.",
2230 ) from None
2231 resource = schema.NendoResource(
2232 file_path=self.storage_driver.get_file_path(
2233 src=target_file,
2234 user_id=str(user_id) if user_id else str(self.user.id),
2235 ),
2236 file_name=self.storage_driver.get_file_name(
2237 src=target_file,
2238 user_id=str(user_id) if user_id else str(self.user.id),
2239 ),
2240 resource_type="blob",
2241 location=self.storage_driver.get_driver_location(),
2242 meta={},
2243 )
2244 return schema.NendoBlobCreate(resource=resource.model_dump(), user_id=self.user.id)
2246 def _create_blob_from_file(
2247 self,
2248 file_path: FilePath,
2249 copy_to_library: Optional[bool] = None,
2250 user_id: Optional[uuid.UUID] = None,
2251 ) -> schema.NendoBlobCreate:
2252 """Create a blob from a given filepath."""
2253 target_file = None
2254 if not os.path.isfile(file_path):
2255 raise schema.NendoResourceError("File not found", file_path)
2257 copy_to_library = copy_to_library or self.config.copy_to_library
2258 meta = {}
2259 meta.update({"checksum": md5sum(file_path)})
2260 if copy_to_library:
2261 try:
2262 file_stats = os.stat(file_path)
2263 meta.update(
2264 {
2265 "original_filename": os.path.basename(file_path),
2266 "original_filepath": os.path.dirname(file_path),
2267 "original_size": file_stats.st_size,
2268 "original_checksum": md5sum(file_path),
2269 },
2270 )
2272 target_file = self.storage_driver.save_file(
2273 file_name=self.storage_driver.generate_filename(
2274 filetype=os.path.splitext(file_path)[1][1:], # without the dot
2275 user_id=user_id,
2276 ),
2277 file_path=file_path,
2278 user_id=str(user_id) if user_id else str(self.user.id),
2279 )
2280 location = self.storage_driver.get_driver_location()
2281 except Exception as e: # noqa: BLE001
2282 raise schema.NendoLibraryError(
2283 f"Failed storing blob {target_file}. Error: {e}.",
2284 ) from None
2285 else:
2286 target_file = file_path
2287 location = schema.ResourceLocation.original
2289 resource = schema.NendoResource(
2290 file_path=os.path.dirname(target_file),
2291 file_name=os.path.basename(target_file),
2292 location=location,
2293 meta=meta or {},
2294 )
2295 return schema.NendoBlobCreate(resource=resource.model_dump(), user_id=self.user.id)
2297 def load_blob(
2298 self, blob_id: uuid.UUID, user_id: Optional[Union[str, uuid.UUID]] = None,
2299 ) -> schema.NendoBlob:
2300 """Loads a blob of data into memory.
2302 Args:
2303 blob_id (uuid.UUID): The UUID of the blob.
2304 user_id (Optional[Union[str, uuid.UUID]], optional): ID of the user
2305 who's loading the blob.
2307 Returns:
2308 schema.NendoBlob: The loaded blob.
2309 """
2310 user_id = self._ensure_user_uuid(user_id)
2311 with self.session_scope() as session:
2312 blob_db = (
2313 session.query(model.NendoBlobDB)
2314 .filter(model.NendoBlobDB.id == blob_id)
2315 .one_or_none()
2316 )
2317 if blob_db is not None:
2318 blob = schema.NendoBlob.model_validate(blob_db)
2319 local_blob = self.storage_driver.as_local(
2320 file_path=blob.resource.src,
2321 location=blob.resource.location,
2322 user_id=user_id,
2323 )
2325 # load blob data into memory
2326 if os.path.splitext(local_blob)[1] == ".pkl":
2327 with open(local_blob, "rb") as f:
2328 blob.data = pickle.load(f) # noqa: S301
2329 elif os.path.splitext(local_blob)[1] == ".npy":
2330 blob.data = np.load(local_blob)
2331 elif os.path.splitext(local_blob)[1] == ".wav":
2332 librosa.load(local_blob, mono=False)
2333 else:
2334 logger.error(
2335 "Blob file format not recognized. "
2336 "Returning blob with empty data.",
2337 )
2339 return blob
2341 def store_blob(
2342 self, file_path: Union[str, FilePath], user_id: Optional[Union[str, uuid.UUID]] = None
2343 ) -> schema.NendoBlob:
2344 """Stores a blob of data.
2346 Args:
2347 file_path (Union[str, FilePath]): The blob to store.
2348 user_id (Optional[Union[str, uuid.UUID]], optional): ID of the user
2349 who's storing the file to blob.
2351 Returns:
2352 schema.NendoBlob: The stored blob.
2353 """
2354 user_id = self._ensure_user_uuid(user_id)
2355 blob = self._create_blob_from_file(file_path=file_path, user_id=user_id)
2356 if blob is not None:
2357 with self.session_scope() as session:
2358 db_blob = self._upsert_blob_db(blob, session)
2359 blob = schema.NendoBlob.model_validate(db_blob)
2360 return blob
2362 def store_blob_from_bytes(
2363 self,
2364 data: bytes,
2365 user_id: Optional[Union[str, uuid.UUID]] = None,
2366 ) -> schema.NendoBlob:
2367 """Stores a data of type `bytes` to a blob.
2369 Args:
2370 data (bytes): The blob to store.
2371 user_id (Optional[Union[str, uuid.UUID]], optional): ID of the user
2372 who's storing the bytes to blob.
2374 Returns:
2375 schema.NendoBlob: The stored blob.
2376 """
2377 user_id = self._ensure_user_uuid(user_id)
2378 blob = self._create_blob_from_bytes(data=data, user_id=user_id)
2379 if blob is not None:
2380 with self.session_scope() as session:
2381 db_blob = self._upsert_blob_db(blob, session)
2382 blob = schema.NendoBlob.model_validate(db_blob)
2383 return blob
2385 def remove_blob(
2386 self,
2387 blob_id: uuid.UUID,
2388 remove_resources: bool = True,
2389 user_id: Optional[Union[str, uuid.UUID]] = None,
2390 ) -> bool:
2391 """Deletes a blob of data.
2393 Args:
2394 blob_id (uuid.UUID): The UUID of the blob.
2395 remove_resources (bool): If True, remove associated resources.
2396 user_id (Optional[Union[str, uuid.UUID]], optional): ID of the user
2397 who's removing the blob.
2399 Returns:
2400 success (bool): True if removal was successful, False otherwise
2401 """
2402 user_id = self._ensure_user_uuid(user_id)
2403 with self.session_scope() as session:
2404 target = (
2405 session.query(model.NendoBlobDB)
2406 .filter(model.NendoBlobDB.id == blob_id)
2407 .first()
2408 )
2409 session.delete(target)
2410 if remove_resources:
2411 logger.info("Removing resources associated with Blob %s", str(blob_id))
2412 try:
2413 self.storage_driver.remove_file(
2414 file_name=target.resource["file_name"],
2415 user_id=user_id,
2416 )
2417 except Exception as e: # noqa: BLE001
2418 logger.error("Removing %s failed: %s", target.resource.model_dump().src, e)
2419 return True
2421 # ==================================
2422 #
2423 # MISCELLANEOUS MANAGEMENT FUNCTIONS
2424 #
2425 # ==================================
2427 def reset(
2428 self, force: bool = False, user_id: Optional[Union[str, uuid.UUID]] = None,
2429 ) -> None:
2430 """Reset the nendo library.
2432 Erase all tracks, collections and relationships. Ask before erasing everything.
2434 Args:
2435 force (bool, optional): Flag that specifies whether to ask the user for
2436 confirmation of the operation. Default is to ask the user.
2437 user_id (Optional[Union[str, uuid.UUID]], optional): ID of the user
2438 who's resetting the library. If none is given, the configured
2439 nendo default user will be used.
2440 """
2441 user_id = self._ensure_user_uuid(user_id)
2442 should_proceed = (
2443 force
2444 or input(
2445 "Are you sure you want to reset the library? "
2446 "This will purge ALL tracks, collections and relationships!"
2447 "Enter 'y' to confirm: ",
2448 ).lower()
2449 == "y"
2450 )
2452 if not should_proceed:
2453 logger.info("Reset operation cancelled.")
2454 return
2456 logger.info("Resetting nendo library.")
2457 with self.session_scope() as session:
2458 # delete all relationships
2459 session.query(model.TrackTrackRelationshipDB).delete()
2460 session.query(model.TrackCollectionRelationshipDB).delete()
2461 session.query(model.CollectionCollectionRelationshipDB).delete()
2462 # delete all plugin data
2463 session.query(model.NendoPluginDataDB).delete()
2464 session.commit()
2465 # delete all collections
2466 session.query(model.NendoCollectionDB).delete()
2467 # delete all tracks
2468 session.query(model.NendoTrackDB).delete()
2469 # remove files
2470 for library_file in self.storage_driver.list_files(user_id=user_id):
2471 self.storage_driver.remove_file(file_name=library_file, user_id=user_id)