Coverage for src/nendo/library/sqlalchemy_library.py: 76%

694 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-11-30 09:47 +0100

1# -*- encoding: utf-8 -*- 

2# ruff: noqa: S603, S607, PLW1510 

3"""Module implementing the NendoLibraryPlugin using SQLAlchemy. 

4 

5This module implements an SQLAlchemy version of the NendoLibraryPlugin. 

6The plugin is not suitable to be used by itself but should be inherited from 

7when implementing a new backend for the Nendo Library using SQLAlchemy. An example 

8is given by the DuckDB implementation used as the default Nendo Library. 

9""" 

10 

11from __future__ import annotations 

12 

13import logging 

14import os 

15import pickle 

16import subprocess 

17import uuid 

18from contextlib import contextmanager 

19from datetime import datetime 

20from tempfile import NamedTemporaryFile 

21from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Union 

22 

23import librosa 

24import numpy as np 

25import soundfile as sf 

26from sqlalchemy import Float, and_, asc, desc, func, or_, true 

27from sqlalchemy.orm import Query, Session, joinedload, sessionmaker 

28from sqlalchemy.sql.expression import cast 

29from sqlalchemy.sql.sqltypes import Text 

30from tinytag import TinyTag 

31 

32from nendo import schema 

33from nendo.library import model 

34from nendo.utils import AudioFileUtils, ensure_uuid, md5sum 

35 

36if TYPE_CHECKING: 

37 from pydantic import DirectoryPath, FilePath 

38 

39logger = logging.getLogger("nendo") 

40 

41 

42class SqlAlchemyNendoLibrary(schema.NendoLibraryPlugin): 

43 """Implementation of the `NendoLibraryPlugin` using SQLAlchemy.""" 

44 

45 iter_index: int = 0 

46 user: schema.NendoUser = None 

47 

48 def __init__( # noqa: D107 

49 self, 

50 **kwargs: Any, 

51 ) -> None: 

52 super().__init__(**kwargs) 

53 self.iter_index = 0 

54 

55 @property 

56 def default_user(self): 

57 """Default Nendo user.""" 

58 return schema.NendoUser( 

59 id=uuid.UUID(self.config.user_id), 

60 name=self.config.user_name, 

61 password="nendo", # noqa: S106 

62 email="info@okio.ai", 

63 verified=True, 

64 ) 

65 

66 @contextmanager 

67 def session_scope(self): 

68 """Provide a transactional scope around a series of operations.""" 

69 session = sessionmaker(autocommit=False, autoflush=False, bind=self.db)() 

70 try: 

71 yield session 

72 session.commit() 

73 except: 

74 session.rollback() 

75 raise 

76 finally: 

77 session.close() 

78 

79 def _disconnect(self): 

80 """Close DuckDB.""" 

81 self.db.close() 

82 

83 def _ensure_user_uuid(self, user_id: Optional[Union[str, uuid.UUID]] = None): 

84 return ensure_uuid(user_id) if user_id is not None else self.user.id 

85 

86 def _convert_plugin_data(self, value: Any, user_id: Optional[uuid.UUID] = None): 

87 # numpy matrices are stored as .npy blob and the blob ID is returned as 

88 # the plugin data value 

89 if isinstance(value, (np.ndarray, np.matrix)): 

90 with NamedTemporaryFile(suffix=".npy", delete=True) as tmpfile: 

91 np.save(tmpfile.name, value) 

92 blob = self.store_blob(file_path=tmpfile.name, user_id=user_id) 

93 return str(blob.id) 

94 try: 

95 return str(value) 

96 except Exception as e: # noqa: BLE001 

97 logger.error("Failed to load plugin data: %s", e) 

98 

99 # ========================== 

100 # 

101 # TRACK MANAGEMENT FUNCTIONS 

102 # 

103 # ========================== 

104 

105 def __len__(self): 

106 """Obtain the number of tracks.""" 

107 with self.session_scope() as session: 

108 return session.query(model.NendoTrackDB).count() 

109 

110 def __iter__(self): 

111 return self 

112 

113 def __next__(self): 

114 with self.session_scope() as session: 

115 query = session.query(model.NendoTrackDB) 

116 track = query.offset(self.iter_index).first() 

117 if track: 

118 self.iter_index += 1 

119 return schema.NendoTrack.model_validate(track) 

120 raise StopIteration 

121 

122 def _create_track_from_file( 

123 self, 

124 file_path: FilePath, 

125 track_type: str = "track", 

126 copy_to_library: Optional[bool] = None, 

127 skip_duplicate: Optional[bool] = None, 

128 user_id: Optional[uuid.UUID] = None, 

129 meta: Optional[Dict[str, Any]] = None, 

130 ) -> schema.NendoTrackCreate: 

131 """Create a NendoTrack from the file given by file_path. 

132 

133 Args: 

134 file_path (Union[FilePath, str]): Path to the file to be added. 

135 track_type (str, optional): Type of the track. Defaults to "track". 

136 copy_to_library (bool, optional): Flag that specifies whether 

137 the file should be copied into the library directory. 

138 Defaults to None. 

139 skip_duplicate (bool, optional): Flag that specifies whether a 

140 file should be added that already exists in the library, based on its 

141 file checksum. Defaults to None. 

142 user_id (UUID, optional): ID of user adding the track. 

143 meta (dict, optional): Metadata to attach to the track upon adding. 

144 

145 Returns: 

146 schema.NendoTrackCreate: The created NendoTrack. 

147 """ 

148 if not os.path.isfile(file_path): 

149 raise schema.NendoResourceError("File not found", file_path) 

150 

151 if not AudioFileUtils().is_supported_filetype(file_path): 

152 raise schema.NendoResourceError("Unsupported filetype", file_path) 

153 

154 file_checksum = md5sum(file_path) 

155 

156 # skip adding a duplicate based on config flag and hashsum of the file 

157 skip_duplicate = skip_duplicate or self.config.skip_duplicate 

158 if skip_duplicate: 

159 tracks = list(self.find_tracks(value=file_checksum)) 

160 if len(tracks) > 0: 

161 return schema.NendoTrack.model_validate(tracks[0]) 

162 

163 meta = meta or {} 

164 

165 # gather file metadata 

166 try: 

167 # extract ID3 tags 

168 tags = TinyTag.get(file_path) 

169 meta.update(tags.as_dict()) 

170 except KeyError as e: 

171 logger.error("Failed extracting tags from file: %s, Error: %s", file_path, e) 

172 

173 # convert and save to library 

174 copy_to_library = copy_to_library or self.config.copy_to_library 

175 if copy_to_library or (self.config.auto_convert and file_path.endswith(".mp3")): 

176 try: 

177 file_stats = os.stat(file_path) 

178 sr = None 

179 if self.config.auto_convert: 

180 if file_path.endswith(".mp3"): 

181 signal, sr = librosa.load(path=file_path, sr=None, mono=False) 

182 else: 

183 signal, sr = sf.read(file=file_path) 

184 # resample to default rate if required 

185 if self.config.auto_resample and sr != self.config.default_sr: 

186 logger.info( 

187 "Auto-converting to SR of %d", 

188 self.config.default_sr, 

189 ) 

190 signal = librosa.resample( 

191 signal, 

192 orig_sr=sr, 

193 target_sr=self.config.default_sr, 

194 ) 

195 sr = self.config.default_sr 

196 # sf.write expects the channels in the second dimension of the 

197 # signal array! Librosa.load() loads them into the first dimension 

198 signal = np.transpose(signal) if signal.shape[0] <= 2 else signal 

199 path_in_library = self.storage_driver.save_signal( 

200 file_name=self.storage_driver.generate_filename( 

201 filetype="wav", 

202 user_id=user_id, 

203 ), 

204 signal=signal, 

205 sr=sr, 

206 user_id=str(user_id) if user_id else str(self.user.id), 

207 ) 

208 else: 

209 # save file to storage in its original format 

210 path_in_library = self.storage_driver.save_file( 

211 file_name=self.storage_driver.generate_filename( 

212 filetype=os.path.splitext(file_path)[1][1:], # without dot 

213 user_id=user_id, 

214 ), 

215 file_path=file_path, 

216 user_id=str(user_id) if user_id else str(self.user.id), 

217 ) 

218 

219 # save the parsed sample rate 

220 if sr is not None: 

221 meta["sr"] = sr 

222 

223 meta.update( 

224 { 

225 "original_filename": os.path.basename(file_path), 

226 "original_filepath": os.path.dirname(file_path), 

227 "original_size": file_stats.st_size, 

228 "original_checksum": file_checksum, 

229 }, 

230 ) 

231 location = self.storage_driver.get_driver_location() 

232 except Exception as e: # noqa: BLE001 

233 raise schema.NendoLibraryError( 

234 f"Error copying file to the library: {e}.", 

235 ) from e 

236 else: 

237 path_in_library = file_path 

238 location = schema.ResourceLocation.original 

239 

240 resource = schema.NendoResource( 

241 file_path=self.storage_driver.get_file_path( 

242 src=path_in_library, 

243 user_id=str(user_id) if user_id else str(self.user.id), 

244 ), 

245 file_name=self.storage_driver.get_file_name( 

246 src=path_in_library, 

247 user_id=str(user_id) if user_id else str(self.user.id), 

248 ), 

249 resource_type="audio", 

250 location=location, 

251 meta=meta, 

252 ) 

253 return schema.NendoTrackCreate( 

254 nendo_instance=self.nendo_instance, 

255 resource=resource.model_dump(), 

256 user_id=user_id or self.user.id, 

257 track_type=track_type, 

258 meta=meta or {}, 

259 ) 

260 

261 def _create_track_from_signal( 

262 self, 

263 signal: np.ndarray, 

264 sr: int, 

265 track_type: str = "track", 

266 meta: Optional[Dict[str, Any]] = None, 

267 user_id: Optional[uuid.UUID] = None, 

268 ) -> schema.NendoTrackCreate: 

269 """Create a NendoTrack from the given signal. 

270 

271 Args: 

272 signal (np.ndarray): The numpy array containing the audio signal. 

273 sr (int): Sample rate 

274 track_type (str): Track type. 

275 user_id (UUID, optional): The ID of the user adding the track. 

276 meta (Dict[str, Any], optional): Track metadata. Defaults to {}. 

277 

278 Returns: 

279 schema.NendoTrackCreate: The created NendoTrack. 

280 """ 

281 target_file = None 

282 # sf.write expects the channels in the second dimension of the 

283 # signal array! Librosa.load() loads them into the first dimension 

284 signal = np.transpose(signal) if signal.shape[0] <= 2 else signal 

285 try: 

286 if self.config.auto_resample and sr != self.config.default_sr: 

287 logger.info("Auto-converting to SR of %d", self.config.default_sr) 

288 signal = librosa.resample( 

289 signal, orig_sr=sr, target_sr=self.config.default_sr 

290 ) 

291 sr = self.config.default_sr 

292 target_file = self.storage_driver.save_signal( 

293 file_name=self.storage_driver.generate_filename( 

294 filetype="wav", 

295 user_id=str(user_id) if user_id else str(self.user.id), 

296 ), 

297 signal=signal, 

298 sr=sr, 

299 user_id=user_id, 

300 ) 

301 except Exception as e: # noqa: BLE001 

302 raise schema.NendoLibraryError( 

303 f"Failed writing file {target_file} to the library. Error: {e}." 

304 ) from None 

305 resource = schema.NendoResource( 

306 file_path=self.storage_driver.get_file_path( 

307 src=target_file, 

308 user_id=str(user_id) if user_id else str(self.user.id), 

309 ), 

310 file_name=self.storage_driver.get_file_name( 

311 src=target_file, 

312 user_id=str(user_id) if user_id else str(self.user.id), 

313 ), 

314 resource_type="audio", 

315 location=self.storage_driver.get_driver_location(), 

316 ) 

317 # save sample rate 

318 meta = meta or {} 

319 meta["sr"] = sr 

320 return schema.NendoTrackCreate( 

321 resource=resource.model_dump(), 

322 user_id=user_id or self.user.id, 

323 track_type=track_type, 

324 meta=meta, 

325 ) 

326 

327 @schema.NendoPlugin.batch_process 

328 def _add_tracks_db( 

329 self, 

330 file_paths: List[FilePath], 

331 track_type: str = "track", 

332 copy_to_library: Optional[bool] = None, 

333 skip_duplicate: bool = True, 

334 user_id: Optional[uuid.UUID] = None, 

335 meta: Optional[Dict[str, Any]] = None, 

336 ) -> List[schema.NendoTrack]: 

337 """Add multiple tracks to the library from their file_paths. 

338 

339 Args: 

340 file_paths (List[FilePath]): A list of full file paths to add as tracks. 

341 track_type (str, optional): The track_type of the tracks to add. 

342 Defaults to "track". 

343 copy_to_library (bool, optional): Flag that specifies whether 

344 the file should be copied into the library directory. 

345 Defaults to None. 

346 skip_duplicate (bool, optional): Flag that specifies whether a 

347 file should be added that already exists in the library, based on its 

348 file checksum. Defaults to None. 

349 user_id (UUID, optional): ID of user adding the track. 

350 meta (dict, optional): Metadata to attach to the track upon adding. 

351 

352 Returns: 

353 List[schema.NendoTrack]: A list containing all added NendoTracks. 

354 """ 

355 create_list = [] 

356 for fp in file_paths: 

357 try: 

358 create_track = self._create_track_from_file( 

359 file_path=fp, 

360 track_type=track_type, 

361 copy_to_library=copy_to_library, 

362 skip_duplicate=skip_duplicate, 

363 user_id=user_id or self.user.id, 

364 meta=meta, 

365 ) 

366 create_list.append(create_track) 

367 except schema.NendoLibraryError as e: 

368 logger.error("Failed adding file %s. Error: %s", fp, e) 

369 with self.session_scope() as session: 

370 db_tracks = self._upsert_tracks_db(tracks=create_list, session=session) 

371 return [schema.NendoTrack.model_validate(t) for t in db_tracks] 

372 

373 def _get_related_tracks_query( 

374 self, 

375 track_id: uuid.UUID, 

376 session: Session, 

377 user_id: Optional[uuid.UUID] = None, 

378 ) -> Query: 

379 """Get tracks with a relationship to the track with track_id from the DB. 

380 

381 Args: 

382 track_id (UUID): ID of the track to be searched for. 

383 session (Session): Session to be used for the transaction. 

384 user_id (UUID, optional): The user ID to filter for. 

385 

386 Returns: 

387 Query: The SQLAlchemy query object. 

388 """ 

389 user_id = user_id or self.user.id 

390 return ( 

391 session.query(model.NendoTrackDB) 

392 .join( 

393 model.TrackTrackRelationshipDB, 

394 model.NendoTrackDB.id == model.TrackTrackRelationshipDB.source_id, 

395 ) 

396 .filter( 

397 and_( 

398 model.NendoTrackDB.user_id == user_id, 

399 or_( 

400 model.TrackTrackRelationshipDB.target_id == track_id, 

401 model.TrackTrackRelationshipDB.source_id == track_id, 

402 ), 

403 ), 

404 ) 

405 ) 

406 

407 def _upsert_track_track_relationship( 

408 self, relationship: schema.NendoRelationshipBase, session: Session, 

409 ) -> schema.NendoTrack: 

410 """Insert or replace a track-to-track relationship in the database. 

411 

412 Args: 

413 relationship (schema.NendoRelationshipBase): The relationship to upsert. 

414 session (sqlalchemy.Session): Session object to commit to. 

415 

416 Returns: 

417 schema.NendoTrack: The upserted NendoTrack. 

418 """ 

419 if type(relationship) == schema.NendoRelationshipCreate: 

420 # creating new relationship 

421 db_rel = model.TrackTrackRelationshipDB(**relationship.model_dump()) 

422 session.add(db_rel) 

423 else: 

424 # updating existing relationship 

425 db_rel = ( 

426 session.query(model.TrackTrackRelationshipDB) 

427 .filter_by(id=relationship.id) 

428 .first() 

429 ) 

430 db_rel.source_id = relationship.source_id 

431 db_rel.target_id = relationship.target_id 

432 db_rel.relationship_type = relationship.relationship_type 

433 db_rel.meta = relationship.meta 

434 session.commit() 

435 return db_rel 

436 

437 def _upsert_track_db( 

438 self, track: schema.NendoTrackBase, session: Session, 

439 ) -> model.NendoTrackDB: 

440 """Create track in DB or update if it exists. 

441 

442 Args: 

443 track (schema.NendoTrackBase): Track object to be created 

444 session (Session): Session to be used for the transaction 

445 

446 Returns: 

447 model.NendoTrackDB: The ORM model object of the upserted track 

448 """ 

449 if type(track) == schema.NendoTrackCreate: 

450 # create new track 

451 track_dict = track.model_dump() 

452 track_dict.pop("nendo_instance") 

453 db_track = model.NendoTrackDB(**track_dict) 

454 session.add(db_track) 

455 else: 

456 # update existing track 

457 db_track = ( 

458 session.query(model.NendoTrackDB).filter_by(id=track.id).one_or_none() 

459 ) 

460 if db_track is None: 

461 raise schema.NendoTrackNotFoundError("Track not found", id=track.id) 

462 db_track.user_id = track.user_id 

463 db_track.visibility = track.visibility 

464 db_track.resource = track.resource.model_dump() 

465 db_track.track_type = track.track_type 

466 db_track.images = track.images 

467 db_track.meta = track.meta 

468 session.commit() 

469 return db_track 

470 

471 def _upsert_tracks_db( 

472 self, tracks: List[schema.NendoTrackCreate], session: Session, 

473 ) -> List[model.NendoTrackDB]: 

474 """Create multiple tracks in DB or update if it exists. 

475 

476 Args: 

477 tracks (List[schema.NendoTrackCreate]): Track object to be created 

478 session (Session): Session to be used for the transaction 

479 

480 Returns: 

481 List[model.NendoTrackDB]: The ORM model objects of the upserted tracks 

482 """ 

483 db_tracks = [] 

484 for track in tracks: 

485 track_dict = track.model_dump() 

486 track_dict.pop("nendo_instance") 

487 db_tracks.append(model.NendoTrackDB(**track_dict)) 

488 session.add_all(db_tracks) 

489 session.commit() 

490 return db_tracks 

491 

492 def _get_all_plugin_data_db( 

493 self, 

494 track_id: uuid.UUID, 

495 session: Session, 

496 user_id: Optional[uuid.UUID] = None, 

497 ) -> List[model.NendoPluginDataDB]: 

498 """Get all plugin data related to a track from the DB. 

499 

500 Args: 

501 track_id (UUID): Track ID to get the related plugin data for. 

502 session (Session): SQLAlchemy session. 

503 user_id (UUID, optional): The user ID to filter for. 

504 

505 Returns: 

506 List[model.NendoPluginDataDB]: List of nendo plugin data entries. 

507 """ 

508 user_id = user_id or self.user.id 

509 with self.session_scope() as session: 

510 plugin_data_db = ( 

511 session.query(model.NendoPluginDataDB) 

512 .filter( 

513 and_( 

514 model.NendoPluginDataDB.track_id == track_id, 

515 model.NendoPluginDataDB.user_id == user_id, 

516 ), 

517 ) 

518 .all() 

519 ) 

520 return [ 

521 schema.NendoPluginData.model_validate(pdb) for pdb in plugin_data_db 

522 ] 

523 

524 def _get_all_plugin_data_db_by_name( 

525 self, 

526 track_id: uuid.UUID, 

527 plugin_name: str, 

528 session: Session, 

529 user_id: Optional[uuid.UUID] = None, 

530 ) -> List[model.NendoPluginDataDB]: 

531 """Get all plugin data related to a track and a given pluginfrom the DB. 

532 

533 Args: 

534 track_id (uuid.UUID): Track ID to get the related plugin data for. 

535 plugin_name (str): Name of the plugin to get related data for. 

536 session (Session): SQLAlchemy session. 

537 user_id (UUID, optional): The user ID to filter for. 

538 

539 Returns: 

540 List[model.NendoPluginDataDB]: List of nendo plugin data entries. 

541 """ 

542 user_id = user_id or self.user.id 

543 with self.session_scope() as session: 

544 plugin_data_db = ( 

545 session.query(model.NendoPluginDataDB) 

546 .filter( 

547 and_( 

548 model.NendoPluginDataDB.track_id == track_id, 

549 model.NendoPluginDataDB.plugin_name == plugin_name, 

550 model.NendoPluginDataDB.user_id == user_id, 

551 ), 

552 ) 

553 .all() 

554 ) 

555 return ( 

556 schema.NendoPluginData.model_validate(plugin_data_db) 

557 if plugin_data_db is not None 

558 else None 

559 ) 

560 

561 def _get_latest_plugin_data_db( 

562 self, 

563 track_id: uuid.UUID, 

564 plugin_name: str, 

565 plugin_version: str, 

566 key: str, 

567 session: Session, 

568 user_id: Optional[uuid.UUID] = None, 

569 ) -> model.NendoPluginDataDB: 

570 """Get the latest plugin data for a given track id, plugin name and data key. 

571 

572 Args: 

573 track_id (uuid.UUID): Track ID to get the related plugin data for. 

574 plugin_name (str): Name of the plugin to get related data for. 

575 plugin_version (str): Version of the plugin to get related data for. 

576 key (str): Key by which to filter the plugin data. 

577 session (Session): SQLAlchemy session. 

578 user_id (UUID, optional): The user ID to filter for. 

579 

580 Returns: 

581 model.NendoPluginDataDB: A single nendo plugin data entry. 

582 """ 

583 user_id = user_id or self.user.id 

584 with self.session_scope() as session: 

585 plugin_data_db = ( 

586 session.query(model.NendoPluginDataDB) 

587 .filter( 

588 and_( 

589 model.NendoPluginDataDB.track_id == track_id, 

590 model.NendoPluginDataDB.plugin_name == plugin_name, 

591 model.NendoPluginDataDB.plugin_version == plugin_version, 

592 model.NendoPluginDataDB.key == key, 

593 model.NendoPluginDataDB.user_id == user_id, 

594 ), 

595 ) 

596 .order_by(model.NendoPluginDataDB.updated_at.desc()) 

597 .first() 

598 ) 

599 return ( 

600 schema.NendoPluginData.model_validate(plugin_data_db) 

601 if plugin_data_db is not None 

602 else None 

603 ) 

604 

605 def _insert_plugin_data_db( 

606 self, plugin_data: schema.NendoPluginDataCreate, session: Session, 

607 ) -> model.NendoPluginDataDB: 

608 db_plugin_data = model.NendoPluginDataDB(**plugin_data.model_dump()) 

609 session.add(db_plugin_data) 

610 session.commit() 

611 return db_plugin_data 

612 

613 def _update_plugin_data_db( 

614 self, 

615 existing_plugin_data: model.NendoPluginDataDB, 

616 plugin_data: schema.NendoPluginData, 

617 session: Session, 

618 ) -> model.NendoPluginDataDB: 

619 if existing_plugin_data is None: 

620 logger.error("Plugin data not found!") 

621 return None 

622 existing_plugin_data.key = plugin_data.key 

623 existing_plugin_data.value = plugin_data.value 

624 existing_plugin_data.user_id = plugin_data.user_id 

625 session.commit() 

626 return existing_plugin_data 

627 

628 def add_track( 

629 self, 

630 file_path: Union[FilePath, str], 

631 track_type: str = "track", 

632 copy_to_library: Optional[bool] = None, 

633 skip_duplicate: Optional[bool] = None, 

634 user_id: Optional[uuid.UUID] = None, 

635 meta: Optional[Dict[str, Any]] = None, 

636 ) -> schema.NendoTrack: 

637 """Add the track given by path to the library. 

638 

639 Args: 

640 file_path (Union[FilePath, str]): Path to the file to be added. 

641 track_type (str, optional): Type of the track. Defaults to "track". 

642 copy_to_library (bool, optional): Flag that specifies whether 

643 the file should be copied into the library directory. 

644 Defaults to None. 

645 skip_duplicate (bool, optional): Flag that specifies whether a 

646 file should be added that already exists in the library, based on its 

647 file checksum. Defaults to None. 

648 user_id (UUID, optional): ID of user adding the track. 

649 meta (dict, optional): Metadata to attach to the track upon adding. 

650 

651 Returns: 

652 schema.NendoTrack: The track that was added to the library. 

653 """ 

654 skip_duplicate = skip_duplicate or self.config.skip_duplicate 

655 track = self._create_track_from_file( 

656 file_path=file_path, 

657 track_type=track_type, 

658 copy_to_library=copy_to_library, 

659 skip_duplicate=skip_duplicate, 

660 user_id=user_id or self.user.id, 

661 meta=meta, 

662 ) 

663 if track is not None: 

664 with self.session_scope() as session: 

665 db_track = self._upsert_track_db(track=track, session=session) 

666 track = schema.NendoTrack.model_validate(db_track) 

667 return track 

668 

669 def add_track_from_signal( 

670 self, 

671 signal: np.ndarray, 

672 sr: int, 

673 track_type: str = "track", 

674 user_id: Optional[uuid.UUID] = None, 

675 meta: Optional[Dict[str, Any]] = None, 

676 ) -> schema.NendoTrack: 

677 """Add a track to the library that is described by the given signal. 

678 

679 Args: 

680 signal (np.ndarray): The numpy array containing the audio signal. 

681 sr (int): Sample rate 

682 track_type (str): Track type. 

683 user_id (UUID, optional): The ID of the user adding the track. 

684 meta (Dict[str, Any], optional): Track metadata. Defaults to {}. 

685 

686 Returns: 

687 schema.NendoTrack: The added NendoTrack 

688 """ 

689 track = self._create_track_from_signal( 

690 signal=signal, 

691 sr=sr, 

692 track_type=track_type, 

693 meta=meta, 

694 user_id=user_id or self.user.id, 

695 ) 

696 if track is not None: 

697 with self.session_scope() as session: 

698 db_track = self._upsert_track_db(track=track, session=session) 

699 track = schema.NendoTrack.model_validate(db_track) 

700 return track 

701 

702 def add_tracks( 

703 self, 

704 path: Union[str, DirectoryPath], 

705 track_type: str = "track", 

706 user_id: Optional[Union[str, uuid.UUID]] = None, 

707 copy_to_library: Optional[bool] = None, 

708 skip_duplicate: bool = True, 

709 ) -> schema.NendoCollection: 

710 """Scan the provided path and upsert the information into the library. 

711 

712 Args: 

713 path (Union[str, DirectoryPath]): Path to the directory to be scanned 

714 track_type (str, optional): Track type for the new tracks 

715 user_id (UUID, optional): The ID of the user adding the track. 

716 copy_to_library (bool): Copy and convert the data into the nendo Library? 

717 skip_duplicate (bool): Skip adding duplicates? 

718 

719 Returns: 

720 tracks (list[NendoTrack]): The tracks that were added to the Library 

721 """ 

722 user_id = self._ensure_user_uuid(user_id) 

723 file_list = [] 

724 if not os.path.exists(path): 

725 raise schema.NendoLibraryError(f"Source directory {path} does not exist.") 

726 for root, _, files in os.walk(path): 

727 file_list.extend( 

728 [ 

729 os.path.join(root, file) 

730 for file in files 

731 if AudioFileUtils().is_supported_filetype(file) 

732 ], 

733 ) 

734 tracks = self._add_tracks_db( 

735 file_paths=file_list, 

736 track_type=track_type, 

737 copy_to_library=copy_to_library, 

738 skip_duplicate=skip_duplicate, 

739 user_id=user_id, 

740 ) 

741 return self.add_collection(name=path, track_ids=[t.id for t in tracks]) 

742 

743 def add_track_relationship( 

744 self, 

745 track_one_id: Union[str, uuid.UUID], 

746 track_two_id: Union[str, uuid.UUID], 

747 relationship_type: str = "relationship", 

748 meta: Optional[Dict[str, Any]] = None, 

749 ) -> bool: 

750 """Add a relationship between two tracks.""" 

751 track_one = self.get_track(track_one_id) # track 

752 track_two = self.get_track(track_two_id) # related 

753 

754 # check that tracks are not the same 

755 if track_one_id == track_two_id: 

756 logger.error("Error must provide two different existing track ids") 

757 return False 

758 

759 # check if tracks exist 

760 if track_one is None: 

761 logger.error("Error track id %s not found", str(track_one_id)) 

762 return False 

763 

764 if track_two is None: 

765 logger.error("Error track id %s not found", str(track_two_id)) 

766 return False 

767 

768 # create bidirectional relationship 

769 relationship_from = schema.NendoRelationshipCreate( 

770 source_id=track_one.id, 

771 target_id=track_two.id, 

772 relationship_type=relationship_type, 

773 meta=meta or {}, 

774 ) 

775 relationship_to = schema.NendoRelationshipCreate( 

776 source_id=track_two.id, 

777 target_id=track_one.id, 

778 relationship_type=relationship_type, 

779 meta=meta or {}, 

780 ) 

781 with self.session_scope() as session: 

782 relationship_from = schema.NendoRelationship.model_validate( 

783 self._upsert_track_track_relationship( 

784 relationship=relationship_from, session=session, 

785 ), 

786 ) 

787 _ = schema.NendoRelationship.model_validate( 

788 self._upsert_track_track_relationship( 

789 relationship=relationship_to, session=session, 

790 ), 

791 ) 

792 # avoid redundancy; only append one direction 

793 track_one.related_tracks.append(relationship_from) 

794 return True 

795 

796 def add_related_track( 

797 self, 

798 file_path: Union[str, FilePath], 

799 related_track_id: Union[str, uuid.UUID], 

800 track_type: str = "track", 

801 user_id: Optional[Union[str, uuid.UUID]] = None, 

802 track_meta: Optional[Dict[str, Any]] = None, 

803 relationship_type: str = "relationship", 

804 meta: Optional[Dict[str, Any]] = None, 

805 ) -> schema.NendoTrack: 

806 """Add a track from a file with a relationship to another track.""" 

807 user_id = self._ensure_user_uuid(user_id=user_id) 

808 related_track_id = ensure_uuid(related_track_id) 

809 track = self.add_track( 

810 file_path=file_path, track_type=track_type, user_id=user_id, meta=track_meta, 

811 ) 

812 # create bidirectional relationship 

813 relationship_from = schema.NendoRelationshipCreate( 

814 source_id=track.id, 

815 target_id=related_track_id, 

816 relationship_type=relationship_type, 

817 meta=meta or {}, 

818 ) 

819 relationship_to = schema.NendoRelationshipCreate( 

820 source_id=related_track_id, 

821 target_id=track.id, 

822 relationship_type=relationship_type, 

823 meta=meta or {}, 

824 ) 

825 with self.session_scope() as session: 

826 relationship_from = schema.NendoRelationship.model_validate( 

827 self._upsert_track_track_relationship( 

828 relationship=relationship_from, session=session, 

829 ), 

830 ) 

831 _ = schema.NendoRelationship.model_validate( 

832 self._upsert_track_track_relationship( 

833 relationship=relationship_to, session=session, 

834 ), 

835 ) 

836 # avoid redundancy; only append one direction 

837 track.related_tracks.append(relationship_from) 

838 return track 

839 

840 def add_related_track_from_signal( 

841 self, 

842 signal: np.ndarray, 

843 sr: int, 

844 related_track_id: Union[str, uuid.UUID], 

845 track_type: str = "track", 

846 user_id: Optional[uuid.UUID] = None, 

847 track_meta: Optional[Dict[str, Any]] = None, 

848 relationship_type: str = "relationship", 

849 meta: Optional[Dict[str, Any]] = None, 

850 ) -> schema.NendoTrack: 

851 """Add a track from a signal with a relationship to another track.""" 

852 user_id = user_id or self.user.id 

853 related_track_id = ensure_uuid(related_track_id) 

854 track = self.add_track_from_signal( 

855 signal=signal, sr=sr, track_type=track_type, meta=track_meta, 

856 ) 

857 # create bidirectional relationship 

858 relationship_from = schema.NendoRelationshipCreate( 

859 source_id=track.id, 

860 target_id=related_track_id, 

861 relationship_type=relationship_type, 

862 meta=meta or {}, 

863 ) 

864 relationship_to = schema.NendoRelationshipCreate( 

865 source_id=track.id, 

866 target_id=related_track_id, 

867 relationship_type=relationship_type, 

868 meta=meta or {}, 

869 ) 

870 with self.session_scope() as session: 

871 relationship_from = schema.NendoRelationship.model_validate( 

872 self._upsert_track_track_relationship( 

873 relationship=relationship_from, session=session, 

874 ), 

875 ) 

876 _ = schema.NendoRelationship.model_validate( 

877 self._upsert_track_track_relationship( 

878 relationship=relationship_to, session=session, 

879 ), 

880 ) 

881 # avoid redundancy; only append one direction 

882 track.related_tracks.append(relationship_from) 

883 return track 

884 

885 def update_track( 

886 self, 

887 track: schema.NendoTrack, 

888 ) -> schema.NendoTrack: 

889 """Updates the given track by storing it to the database. 

890 

891 Args: 

892 track (NendoTrack): The track to be stored to the database. 

893 

894 Raises: 

895 NendoTrackNotFoundError: If the track passed to the function 

896 does not exist in the database. 

897 

898 Returns: 

899 NendoTrack: The updated track. 

900 """ 

901 with self.session_scope() as session: 

902 # update existing track 

903 db_track = ( 

904 session.query(model.NendoTrackDB).filter_by(id=track.id).one_or_none() 

905 ) 

906 if db_track is None: 

907 raise schema.NendoTrackNotFoundError("Track not found", id=track.id) 

908 db_track.user_id = track.user_id 

909 db_track.visibility = track.visibility 

910 db_track.resource = track.resource.model_dump() 

911 db_track.track_type = track.track_type 

912 db_track.images = track.images 

913 db_track.meta = track.meta 

914 session.commit() 

915 return db_track 

916 

917 def add_plugin_data( 

918 self, 

919 track_id: Union[str, uuid.UUID], 

920 plugin_name: str, 

921 plugin_version: str, 

922 key: str, 

923 value: Any, 

924 user_id: Optional[Union[str, uuid.UUID]] = None, 

925 replace: bool = False, 

926 ) -> schema.NendoPluginData: 

927 """Add plugin data to a NendoTrack and persist changes into the DB. 

928 

929 Args: 

930 track_id (Union[str, UUID]): ID of the track to which 

931 the plugin data should be added. 

932 plugin_name (str): Name of the plugin. 

933 plugin_version (str): Version of the plugin. 

934 key (str): Key under which to save the data. 

935 value (Any): Data to save. 

936 user_id (Union[str, UUID], optional): ID of user adding the plugin data. 

937 replace (bool, optional): Flag that determines whether 

938 the last existing data point for the given plugin name and -version 

939 is overwritten or not. Defaults to False. 

940 

941 Returns: 

942 NendoPluginData: The saved plugin data as a NendoPluginData object. 

943 """ 

944 # create plugin data 

945 user_id = self._ensure_user_uuid(user_id) 

946 value_converted = self._convert_plugin_data(value=value, user_id=user_id) 

947 plugin_data = schema.NendoPluginDataCreate( 

948 track_id=ensure_uuid(track_id), 

949 user_id=ensure_uuid(user_id), 

950 plugin_name=plugin_name, 

951 plugin_version=plugin_version, 

952 key=key, 

953 value=value_converted, 

954 ) 

955 with self.session_scope() as session: 

956 if replace: 

957 existing_plugin_data = self._get_latest_plugin_data_db( 

958 track_id=track_id, 

959 plugin_name=plugin_name, 

960 plugin_version=plugin_version, 

961 key=key, 

962 session=session, 

963 ) 

964 

965 if existing_plugin_data is not None: 

966 db_plugin_data = self._update_plugin_data_db( 

967 existing_plugin_data=existing_plugin_data, 

968 plugin_data=plugin_data, 

969 session=session, 

970 ) 

971 else: 

972 db_plugin_data = self._insert_plugin_data_db( 

973 plugin_data=plugin_data, session=session, 

974 ) 

975 else: 

976 db_plugin_data = self._insert_plugin_data_db( 

977 plugin_data=plugin_data, session=session, 

978 ) 

979 return schema.NendoPluginData.model_validate(db_plugin_data) 

980 

981 def get_track(self, track_id: uuid.UUID) -> schema.NendoTrack: 

982 """Get a single track from the library by ID.""" 

983 with self.session_scope() as session: 

984 track_db = ( 

985 session.query(model.NendoTrackDB) 

986 .filter(model.NendoTrackDB.id == track_id) 

987 .one_or_none() 

988 ) 

989 return ( 

990 schema.NendoTrack.model_validate(track_db) if track_db is not None else None 

991 ) 

992 

993 @schema.NendoPlugin.stream_output 

994 def get_tracks( 

995 self, 

996 query: Optional[Query] = None, 

997 user_id: Optional[Union[str, uuid.UUID]] = None, 

998 order_by: Optional[str] = None, 

999 order: str = "asc", 

1000 limit: Optional[int] = None, 

1001 offset: Optional[int] = None, 

1002 session: Optional[Session] = None, 

1003 ) -> Union[List, Iterator]: 

1004 """Get tracks based on the given query parameters. 

1005 

1006 Args: 

1007 query (Optional[Query]): Query object to build from. 

1008 user_id (Union[str, UUID], optional): ID of user to filter tracks by. 

1009 order_by (Optional[str]): Key used for ordering the results. 

1010 order (Optional[str]): Order in which to retrieve results ("asc" or "desc"). 

1011 limit (Optional[int]): Limit the number of returned results. 

1012 offset (Optional[int]): Offset into the paginated results (requires limit). 

1013 session (sqlalchemy.Session): Session object to commit to. 

1014 

1015 Returns: 

1016 Union[List, Iterator]: List or generator of tracks, depending on the 

1017 configuration variable stream_mode 

1018 """ 

1019 user_id = self._ensure_user_uuid(user_id) 

1020 s = session or self.session_scope() 

1021 with s as session_local: 

1022 if query: 

1023 query_local = query 

1024 else: 

1025 query_local = session_local.query(model.NendoTrackDB).filter( 

1026 model.NendoTrackDB.user_id == user_id, 

1027 ) 

1028 if order_by: 

1029 if order_by == "random": 

1030 query_local = query_local.order_by(func.random()) 

1031 elif order == "desc": 

1032 query_local = query_local.order_by( 

1033 desc(getattr(model.NendoTrackDB, order_by)), 

1034 ) 

1035 else: 

1036 query_local = query_local.order_by( 

1037 asc(getattr(model.NendoTrackDB, order_by)), 

1038 ) 

1039 if limit: 

1040 query_local = query_local.limit(limit) 

1041 if offset: 

1042 query_local = query_local.offset(offset) 

1043 

1044 if self.config.stream_chunk_size > 1: 

1045 chunk = [] 

1046 for track in query_local: 

1047 chunk.append(schema.NendoTrack.model_validate(track)) 

1048 if len(chunk) == self.config.stream_chunk_size: 

1049 yield chunk 

1050 chunk = [] 

1051 if chunk: # yield remaining tracks in non-full chunk 

1052 yield chunk 

1053 else: 

1054 for track in query_local: 

1055 yield schema.NendoTrack.model_validate(track) 

1056 

1057 def get_related_tracks( 

1058 self, 

1059 track_id: Union[str, uuid.UUID], 

1060 user_id: Optional[Union[str, uuid.UUID]] = None, 

1061 order_by: Optional[str] = None, 

1062 order: Optional[str] = "asc", 

1063 limit: Optional[int] = None, 

1064 offset: Optional[int] = None, 

1065 ) -> Union[List, Iterator]: 

1066 """Get tracks with a relationship to the track with track_id. 

1067 

1068 Args: 

1069 track_id (str): ID of the track to be searched for. 

1070 user_id (Union[str, UUID], optional): The user ID to filter for. 

1071 order_by (Optional[str]): Key used for ordering the results. 

1072 order (Optional[str]): Order in which to retrieve results ("asc" or "desc"). 

1073 limit (Optional[int]): Limit the number of returned results. 

1074 offset (Optional[int]): Offset into the paginated results (requires limit). 

1075 

1076 Returns: 

1077 Union[List, Iterator]: List or generator of tracks, depending on the 

1078 configuration variable stream_mode 

1079 """ 

1080 user_id = self._ensure_user_uuid(user_id) 

1081 with self.session_scope() as session: 

1082 query = self._get_related_tracks_query( 

1083 track_id=ensure_uuid(track_id), 

1084 session=session, 

1085 user_id=user_id, 

1086 ) 

1087 return self.get_tracks( 

1088 query=query, 

1089 order_by=order_by, 

1090 order=order, 

1091 limit=limit, 

1092 offset=offset, 

1093 session=session, 

1094 ) 

1095 

1096 def find_tracks( 

1097 self, 

1098 value: str, 

1099 user_id: Optional[Union[str, uuid.UUID]] = None, 

1100 order_by: Optional[str] = None, 

1101 order: str = "asc", 

1102 limit: Optional[int] = None, 

1103 offset: Optional[int] = None, 

1104 ) -> Union[List, Iterator]: 

1105 """Obtain tracks from the db by fulltext search. 

1106 

1107 Args: 

1108 value (str): The value to search for. The value is matched against 

1109 text representations of the track's `meta` and `resource` fields. 

1110 user_id (Union[str, UUID], optional): The user ID to filter for. 

1111 order_by (Optional[str]): Key used for ordering the results. 

1112 limit (Optional[int]): Limit the number of returned results. 

1113 offset (Optional[int]): Offset into the paginated results (requires limit). 

1114 

1115 Returns: 

1116 Union[List, Iterator]: List or generator of tracks, depending on the 

1117 configuration variable stream_mode 

1118 """ 

1119 user_id = self._ensure_user_uuid(user_id) 

1120 with self.session_scope() as session: 

1121 query = session.query(model.NendoTrackDB).filter( 

1122 and_( 

1123 or_( 

1124 cast(model.NendoTrackDB.resource, Text()).ilike( 

1125 "%{}%".format(value), 

1126 ), 

1127 cast(model.NendoTrackDB.meta, Text()).ilike( 

1128 "%{}%".format(value), 

1129 ), 

1130 ), 

1131 model.NendoTrackDB.user_id == user_id, 

1132 ), 

1133 ) 

1134 return self.get_tracks( 

1135 query=query, 

1136 order_by=order_by, 

1137 order=order, 

1138 limit=limit, 

1139 offset=offset, 

1140 session=session, 

1141 ) 

1142 

1143 def filter_tracks( 

1144 self, 

1145 filters: Optional[Dict[str, Any]] = None, 

1146 resource_filters: Optional[Dict[str, Any]] = None, 

1147 track_type: Optional[Union[str, List[str]]] = None, 

1148 user_id: Optional[Union[str, uuid.UUID]] = None, 

1149 collection_id: Optional[Union[str, uuid.UUID]] = None, 

1150 plugin_names: Optional[List[str]] = None, 

1151 order_by: Optional[str] = None, 

1152 order: str = "asc", 

1153 limit: Optional[int] = None, 

1154 offset: Optional[int] = None, 

1155 session: Optional[Session] = None, 

1156 ) -> Union[List, Iterator]: 

1157 """Obtain tracks from the db by filtering over plugin data. 

1158 

1159 Args: 

1160 filters (Optional[dict]): Dictionary containing the filters to apply. 

1161 Defaults to None. 

1162 resource_filters (dict): Dictionary containing the keywords to search for 

1163 over the track.resource.meta field. The dictionary's values 

1164 should contain singular search tokens and the keys currently have no 

1165 effect but might in the future. Defaults to {}. 

1166 track_type (Union[str, List[str]], optional): Track type to filter for. 

1167 Can be a singular type or a list of types. Defaults to None. 

1168 user_id (Union[str, UUID], optional): The user ID to filter for. 

1169 collection_id (Union[str, uuid.UUID], optional): Collection id to 

1170 which the filtered tracks must have a relationship. Defaults to None. 

1171 plugin_names (list, optional): List used for applying the filter only to 

1172 data of certain plugins. If None, all plugin data related to the track 

1173 is used for filtering. 

1174 order_by (str, optional): Key used for ordering the results. 

1175 order (str, optional): Ordering ("asc" vs "desc"). Defaults to "asc". 

1176 limit (int, optional): Limit the number of returned results. 

1177 offset (int, optional): Offset into the paginated results (requires limit). 

1178 

1179 Returns: 

1180 Union[List, Iterator]: List or generator of tracks, depending on the 

1181 configuration variable stream_mode 

1182 """ 

1183 user_id = self._ensure_user_uuid(user_id) 

1184 s = session or self.session_scope() 

1185 with s as session_local: 

1186 """Obtain tracks from the db by filtering w.r.t. various fields.""" 

1187 query = session_local.query(model.NendoTrackDB).filter( 

1188 model.NendoTrackDB.user_id == user_id, 

1189 ) 

1190 plugin_name_condition = true() 

1191 if ( 

1192 plugin_names is not None 

1193 and isinstance(plugin_names, list) 

1194 and len(plugin_names) > 0 

1195 ): 

1196 plugin_name_condition = model.NendoPluginDataDB.plugin_name.in_( 

1197 plugin_names, 

1198 ) 

1199 

1200 # apply track type filter if applicable 

1201 if track_type is not None: 

1202 if isinstance(track_type, list): 

1203 query = query.filter(model.NendoTrackDB.track_type.in_(track_type)) 

1204 else: 

1205 query = query.filter(model.NendoTrackDB.track_type == track_type) 

1206 

1207 # apply collection filter if applicable 

1208 if collection_id is not None: 

1209 collection_id = ensure_uuid(collection_id) 

1210 

1211 query = query.join( 

1212 model.TrackCollectionRelationshipDB, 

1213 model.NendoTrackDB.id 

1214 == model.TrackCollectionRelationshipDB.source_id, 

1215 ).filter( 

1216 model.TrackCollectionRelationshipDB.target_id == collection_id, 

1217 ) 

1218 

1219 # apply resource filters if applicable 

1220 if resource_filters: 

1221 values = [v for v in resource_filters.values() if isinstance(v, list)] 

1222 or_filter = or_( 

1223 *( 

1224 cast(model.NendoTrackDB.resource, Text()).ilike( 

1225 "%{}%".format(str(value)), 

1226 ) 

1227 for rf in values 

1228 for value in rf 

1229 ), 

1230 ) 

1231 query = query.filter(or_filter) 

1232 

1233 # apply plugin data filters 

1234 if filters is not None: 

1235 for k, v in filters.items(): 

1236 if v is None: 

1237 continue 

1238 

1239 # range 

1240 if type(v) == tuple: 

1241 query = query.filter( 

1242 model.NendoTrackDB.plugin_data.any( 

1243 and_( 

1244 plugin_name_condition, 

1245 model.NendoPluginDataDB.key == k, 

1246 cast(model.NendoPluginDataDB.value, Float) 

1247 >= cast(v[0], Float), 

1248 cast(model.NendoPluginDataDB.value, Float) 

1249 <= cast(v[1], Float), 

1250 ), 

1251 ), 

1252 ) 

1253 # multiselect 

1254 elif isinstance(v, list): 

1255 sv = [str(vi) for vi in v] 

1256 query = query.filter( 

1257 model.NendoTrackDB.plugin_data.any( 

1258 and_( 

1259 plugin_name_condition, 

1260 model.NendoPluginDataDB.key == k, 

1261 model.NendoPluginDataDB.value.in_(sv), 

1262 ), 

1263 ), 

1264 ) 

1265 # fuzzy match 

1266 else: 

1267 query = query.filter( 

1268 model.NendoTrackDB.plugin_data.any( 

1269 and_( 

1270 plugin_name_condition, 

1271 model.NendoPluginDataDB.key == k, 

1272 cast(model.NendoPluginDataDB.value, Text()).ilike( 

1273 "%{}%".format(str(v)), 

1274 ), 

1275 ), 

1276 ), 

1277 ) 

1278 

1279 return self.get_tracks( 

1280 query=query, 

1281 order_by=order_by, 

1282 order=order, 

1283 limit=limit, 

1284 offset=offset, 

1285 session=session, 

1286 ) 

1287 

1288 def remove_track( 

1289 self, 

1290 track_id: Union[str, uuid.UUID], 

1291 remove_relationships: bool = False, 

1292 remove_plugin_data: bool = True, 

1293 remove_resources: bool = True, 

1294 user_id: Optional[Union[str, uuid.UUID]] = None, 

1295 ) -> bool: 

1296 """Delete track from library by ID. 

1297 

1298 Args: 

1299 track_id (Union[str, uuid.UUID]): The ID of the track to remove 

1300 remove_relationships (bool): 

1301 If False prevent deletion if related tracks exist, 

1302 if True delete relationships together with the object 

1303 remove_plugin_data (bool): 

1304 If False prevent deletion if related plugin data exist 

1305 if True delete plugin data together with the object 

1306 remove_resources (bool): 

1307 If False, keep the related resources, e.g. files 

1308 if True, delete the related resources 

1309 user_id (Union[str, UUID], optional): The ID of the user owning the track. 

1310 

1311 Returns: 

1312 success (bool): True if removal was successful, False otherwise. 

1313 """ 

1314 user_id = self._ensure_user_uuid(user_id) 

1315 track_id = ensure_uuid(track_id) 

1316 with self.session_scope() as session: 

1317 tracks_with_relations = self._get_related_tracks_query( 

1318 track_id=track_id, session=session, user_id=user_id, 

1319 ).all() 

1320 collections_with_relations = ( 

1321 session.query(model.NendoCollectionDB) 

1322 .join( 

1323 model.TrackCollectionRelationshipDB, 

1324 model.NendoCollectionDB.id 

1325 == model.TrackCollectionRelationshipDB.target_id, 

1326 ) 

1327 .filter(model.TrackCollectionRelationshipDB.source_id == track_id) 

1328 .all() 

1329 ) 

1330 related_plugin_data = self._get_all_plugin_data_db( 

1331 track_id=track_id, session=session, 

1332 ) 

1333 if len(related_plugin_data) > 0: 

1334 if remove_plugin_data: 

1335 logger.info("Removing %d plugin data", len(related_plugin_data)) 

1336 session.query(model.NendoPluginDataDB).filter( 

1337 model.NendoPluginDataDB.track_id == track_id, 

1338 ).delete() 

1339 session.commit() 

1340 else: 

1341 logger.warning( 

1342 "Cannot remove due to %d existing " 

1343 "plugin data entries. Set `remove_plugin_data=True` " 

1344 "to remove them.", len(related_plugin_data), 

1345 ) 

1346 return False 

1347 n_rel = len(tracks_with_relations) + len(collections_with_relations) 

1348 if n_rel > 0: 

1349 if remove_relationships: 

1350 session.query(model.TrackTrackRelationshipDB).filter( 

1351 model.TrackTrackRelationshipDB.target_id == track_id, 

1352 ).delete() 

1353 session.query(model.TrackTrackRelationshipDB).filter( 

1354 model.TrackTrackRelationshipDB.source_id == track_id, 

1355 ).delete() 

1356 # remove track from all collections 

1357 for collection in collections_with_relations: 

1358 self._remove_track_from_collection_db( 

1359 track_id=track_id, 

1360 collection_id=collection.id, 

1361 session=session, 

1362 ) 

1363 session.commit() 

1364 else: 

1365 logger.warning( 

1366 "Cannot remove due to %s existing relationships. " 

1367 "Set `remove_relationships=True` to remove them.", 

1368 n_rel, 

1369 ) 

1370 return False 

1371 # delete actual target 

1372 target = ( 

1373 session.query(model.NendoTrackDB) 

1374 .filter(model.NendoTrackDB.id == track_id) 

1375 .first() 

1376 ) 

1377 session.delete(target) 

1378 # only delete if file has been copied to the library 

1379 # ("original_filepath" is present) 

1380 if ( 

1381 remove_resources 

1382 and "original_filepath" 

1383 in schema.NendoTrack.model_validate(target).resource.meta 

1384 ): 

1385 logger.info("Removing resources associated with Track %s", str(track_id)) 

1386 return self.storage_driver.remove_file( 

1387 file_name=target.resource["file_name"], 

1388 user_id=str(user_id) if user_id else str(self.user.id), 

1389 ) 

1390 return True 

1391 

1392 def export_track( 

1393 self, 

1394 track_id: Union[str, uuid.UUID], 

1395 file_path: str, 

1396 file_format: str = "wav", 

1397 ) -> str: 

1398 """Export the track to a file. 

1399 

1400 Args: 

1401 track_id (Union[str, uuid.UUID]): The ID of the target track to export. 

1402 file_path (str): Path to the exported file. Can be either a full 

1403 file path or a directory path. If a directory path is given, 

1404 a filename will be automatically generated and the file will be 

1405 exported to the format specified as file_format. If a full file 

1406 path is given, the format will be deduced from the path and the 

1407 file_format parameter will be ignored. 

1408 file_format (str, optional): Format of the exported track. Ignored if 

1409 file_path is a full file path. Defaults to "wav". 

1410 

1411 Returns: 

1412 str: The path to the exported file. 

1413 """ 

1414 track = self.get_track(track_id=track_id) 

1415 # Check if file_path is a directory 

1416 if os.path.isdir(file_path): 

1417 # Generate a filename with timestamp 

1418 if track.has_meta("original_filename"): 

1419 original_filename = track.get_meta("original_filename") 

1420 else: 

1421 original_filename = track.resource.file_name 

1422 file_name = ( 

1423 f"{original_filename}_nendo_" 

1424 f"{datetime.now().strftime('%Y%m%d%H%M%S')}" # noqa: DTZ005 

1425 f".{file_format}" 

1426 ) 

1427 file_path = os.path.join(file_path, file_name) 

1428 else: 

1429 # Deduce file format from file extension 

1430 file_format = os.path.splitext(file_path)[1].lstrip(".") 

1431 

1432 # Exporting the audio 

1433 temp_path = None 

1434 signal = track.signal 

1435 signal = np.transpose(signal) if signal.shape[0] <= 2 else signal 

1436 if file_format in ("wav", "ogg"): 

1437 sf.write(file_path, signal, track.sr, format=file_format) 

1438 elif file_format == "mp3": 

1439 # Create a temporary WAV file for conversion 

1440 temp_path = file_path.rsplit(".", 1)[0] + ".wav" 

1441 sf.write(temp_path, signal, track.sr) 

1442 subprocess.run( 

1443 ["ffmpeg", "-i", temp_path, "-acodec", "libmp3lame", file_path], 

1444 ) 

1445 else: 

1446 raise ValueError( 

1447 "Unsupported file format. Supported formats are 'wav', 'mp3', and 'ogg'.", 

1448 ) 

1449 

1450 # Clean up temporary file if used 

1451 if temp_path and os.path.exists(temp_path): 

1452 os.remove(temp_path) 

1453 

1454 return file_path 

1455 

1456 # =============================== 

1457 # 

1458 # COLLECTION MANAGEMENT FUNCTIONS 

1459 # 

1460 # =============================== 

1461 

1462 def _get_related_collections_query( 

1463 self, 

1464 collection_id: uuid.UUID, 

1465 session: Session, 

1466 user_id: Optional[uuid.UUID] = None, 

1467 ) -> Query: 

1468 """Create a query for the collections related to a given collection. 

1469 

1470 Args: 

1471 collection_id (UUID): ID of the collection to be searched for. 

1472 session (Session): Session to be used for the transaction. 

1473 user_id (UUID, optional): The user ID to filter for. 

1474 

1475 Returns: 

1476 Query: The SQLAlchemy query object. 

1477 """ 

1478 user_id = user_id or self.user.id 

1479 return ( 

1480 session.query(model.NendoCollectionDB) 

1481 .filter(model.NendoCollectionDB.user_id == user_id) 

1482 .options( 

1483 joinedload(model.NendoCollectionDB.related_tracks).joinedload( 

1484 model.TrackCollectionRelationshipDB.source, 

1485 ), 

1486 ) 

1487 .join( 

1488 model.CollectionCollectionRelationshipDB, 

1489 model.NendoCollectionDB.id 

1490 == model.CollectionCollectionRelationshipDB.source_id, 

1491 ) 

1492 .filter( 

1493 model.CollectionCollectionRelationshipDB.target_id == collection_id, 

1494 ) 

1495 ) 

1496 

1497 def _remove_track_from_collection_db( 

1498 self, 

1499 track_id: uuid.UUID, 

1500 collection_id: uuid.UUID, 

1501 session: Session, 

1502 ) -> bool: 

1503 """Deletes a relationship from the track to the collection in the db. 

1504 

1505 Args: 

1506 collection_id (uuid.UUID): Collection id. 

1507 track_id (uuid.UUID): Track id. 

1508 session (sqlalchemy.Session): Session object 

1509 

1510 Returns: 

1511 success (bool): True if removal was successful, False otherwise. 

1512 """ 

1513 target_relationship = ( 

1514 session.query(model.TrackCollectionRelationshipDB) 

1515 .filter( 

1516 and_( 

1517 model.TrackCollectionRelationshipDB.source_id == track_id, 

1518 model.TrackCollectionRelationshipDB.target_id == collection_id, 

1519 ), 

1520 ) 

1521 .first() 

1522 ) 

1523 if target_relationship is None: 

1524 raise schema.NendoRelationshipNotFoundError( 

1525 "Relationship not found", track_id, collection_id, 

1526 ) 

1527 session.delete(target_relationship) 

1528 # Adjust positions of other states 

1529 rel_db = model.TrackCollectionRelationshipDB 

1530 session.query(model.TrackCollectionRelationshipDB).filter( 

1531 rel_db.target_id == collection_id, 

1532 rel_db.relationship_position 

1533 > target_relationship.relationship_position, 

1534 ).update({rel_db.relationship_position: rel_db.relationship_position - 1}) 

1535 

1536 def add_collection( 

1537 self, 

1538 name: str, 

1539 user_id: Optional[Union[str, uuid.UUID]] = None, 

1540 track_ids: Optional[List[Union[str, uuid.UUID]]] = None, 

1541 description: str = "", 

1542 collection_type: str = "collection", 

1543 visibility: schema.Visibility = schema.Visibility.private, 

1544 meta: Optional[Dict[str, Any]] = None, 

1545 ) -> schema.NendoCollection: 

1546 """Creates a new collection and saves it into the DB. 

1547 

1548 Args: 

1549 track_ids (List[Union[str, uuid.UUID]]): List of track ids 

1550 to be added to the collection. 

1551 name (str): Name of the collection. 

1552 user_id (UUID, optional): The ID of the user adding the collection. 

1553 description (str): Description of the collection. 

1554 collection_type (str): Type of the collection. 

1555 meta (Dict[str, Any]): Metadata of the collection. 

1556 

1557 Returns: 

1558 schema.NendoCollection: The newly created NendoCollection object. 

1559 """ 

1560 user_id = self._ensure_user_uuid(user_id) 

1561 if track_ids is None: 

1562 track_ids = [] 

1563 with self.session_scope() as session: 

1564 # Fetch the track objects 

1565 track_objs = ( 

1566 session.query(model.NendoTrackDB) 

1567 .filter( 

1568 and_( 

1569 model.NendoTrackDB.id.in_( 

1570 [ 

1571 uuid.UUID(t) if isinstance(t, str) else t 

1572 for t in track_ids 

1573 ], 

1574 ), 

1575 model.NendoTrackDB.user_id == user_id, 

1576 ), 

1577 ) 

1578 .all() 

1579 ) 

1580 

1581 # Create a new collection object 

1582 new_collection = model.NendoCollectionDB( 

1583 name=name, 

1584 user_id=user_id, 

1585 description=description, 

1586 collection_type=collection_type, 

1587 visibility=visibility, 

1588 meta=meta or {}, 

1589 ) 

1590 session.add(new_collection) 

1591 session.commit() 

1592 session.refresh(new_collection) 

1593 # Create relationships from tracks to collection 

1594 for idx, track in enumerate(track_objs): 

1595 tc_relationship = model.TrackCollectionRelationshipDB( 

1596 source_id=track.id, 

1597 target_id=new_collection.id, 

1598 relationship_type="track", 

1599 meta={}, 

1600 relationship_position=idx, 

1601 ) 

1602 session.add(tc_relationship) 

1603 session.commit() 

1604 session.refresh(new_collection) 

1605 

1606 return schema.NendoCollection.model_validate(new_collection) 

1607 

1608 def add_related_collection( 

1609 self, 

1610 track_ids: List[Union[str, uuid.UUID]], 

1611 collection_id: Union[str, uuid.UUID], 

1612 name: str, 

1613 description: str = "", 

1614 user_id: Optional[Union[str, uuid.UUID]] = None, 

1615 relationship_type: str = "relationship", 

1616 meta: Optional[Dict[str, Any]] = None, 

1617 ) -> schema.NendoCollection: 

1618 """Adds a new collection with a relationship to the given collection. 

1619 

1620 Args: 

1621 track_ids (List[Union[str, uuid.UUID]]): List of track ids. 

1622 collection_id (Union[str, uuid.UUID]): Existing collection id. 

1623 name (str): Name of the new related collection. 

1624 description (str): Description of the new related collection. 

1625 user_id (UUID, optional): The ID of the user adding the collection. 

1626 relationship_type (str): Type of the relationship. 

1627 meta (Dict[str, Any]): Meta of the new related collection. 

1628 

1629 Returns: 

1630 schema.NendoCollection: The newly added NendoCollection object. 

1631 """ 

1632 user_id = self._ensure_user_uuid(user_id) 

1633 # Create a new collection 

1634 new_collection = self.add_collection( 

1635 name=name, 

1636 user_id=user_id, 

1637 track_ids=track_ids, 

1638 description=description, 

1639 collection_type=relationship_type, 

1640 meta=meta, 

1641 ) 

1642 

1643 with self.session_scope() as session: 

1644 if isinstance(collection_id, str): 

1645 collection_id = uuid.UUID(collection_id) 

1646 collection = ( 

1647 session.query(model.NendoCollectionDB) 

1648 .filter_by(id=collection_id) 

1649 .first() 

1650 ) 

1651 

1652 # Check if the collection does not exist 

1653 if not collection: 

1654 raise schema.NendoCollectionNotFoundError( 

1655 "Collection not found", id=collection_id, 

1656 ) 

1657 

1658 # create bidirectional relationship 

1659 relationship_from = schema.NendoRelationshipCreate( 

1660 source_id=new_collection.id, 

1661 target_id=collection_id, 

1662 relationship_type=relationship_type, 

1663 meta=meta or {}, 

1664 ) 

1665 relationship_to = schema.NendoRelationshipCreate( 

1666 source_id=collection_id, 

1667 target_id=new_collection.id, 

1668 relationship_type=relationship_type, 

1669 meta=meta or {}, 

1670 ) 

1671 relationship_from = model.CollectionCollectionRelationshipDB( 

1672 **relationship_from.model_dump(), 

1673 ) 

1674 relationship_to = model.CollectionCollectionRelationshipDB( 

1675 **relationship_to.model_dump(), 

1676 ) 

1677 session.add(relationship_from) 

1678 session.add(relationship_to) 

1679 

1680 new_collection.related_collections.append(relationship_from) 

1681 return new_collection 

1682 

1683 def add_track_to_collection( 

1684 self, 

1685 track_id: Union[str, uuid.UUID], 

1686 collection_id: Union[str, uuid.UUID], 

1687 position: Optional[int] = None, 

1688 meta: Optional[Dict[str, Any]] = None, 

1689 ) -> schema.NendoCollection: 

1690 """Creates a relationship from the track to the collection. 

1691 

1692 Args: 

1693 track_id (Union[str, uuid.UUID]): ID of the track to add. 

1694 collection_id (Union[str, uuid.UUID]): ID of the collection to 

1695 which to add the track. 

1696 position (int, optional): Target position of the track inside 

1697 the collection. 

1698 meta (Dict[str, Any]): Metadata of the relationship. 

1699 

1700 Returns: 

1701 schema.NendoCollection: The updated NendoCollection object. 

1702 """ 

1703 with self.session_scope() as session: 

1704 # Convert IDs to UUIDs if they're strings 

1705 if isinstance(collection_id, str): 

1706 collection_id = uuid.UUID(collection_id) 

1707 if isinstance(track_id, str): 

1708 track_id = uuid.UUID(track_id) 

1709 

1710 # Check the collection and track objects 

1711 collection = ( 

1712 session.query(model.NendoCollectionDB) 

1713 .filter_by(id=collection_id) 

1714 .first() 

1715 ) 

1716 track = session.query(model.NendoTrackDB).filter_by(id=track_id).first() 

1717 if not collection: 

1718 raise schema.NendoCollectionNotFoundError( 

1719 "The collection does not exist", collection_id, 

1720 ) 

1721 if not track: 

1722 raise schema.NendoCollectionNotFoundError( 

1723 "The track does not exist", track_id, 

1724 ) 

1725 

1726 rc_rel_db = model.TrackCollectionRelationshipDB 

1727 if position is not None: 

1728 # Update other states to keep ordering consistent 

1729 session.query(rc_rel_db).filter( 

1730 rc_rel_db.target_id == collection_id, 

1731 rc_rel_db.relationship_position >= position, 

1732 ).update( 

1733 { 

1734 rc_rel_db.relationship_position: rc_rel_db.relationship_position 

1735 + 1, 

1736 }, 

1737 ) 

1738 else: 

1739 # If no position specified, add at the end 

1740 last_state = ( 

1741 session.query(model.TrackCollectionRelationshipDB) 

1742 .filter_by(target_id=collection_id) 

1743 .order_by( 

1744 model.TrackCollectionRelationshipDB.relationship_position.desc(), 

1745 ) 

1746 .first() 

1747 ) 

1748 position = last_state.relationship_position + 1 if last_state else 0 

1749 

1750 # Create a relationship from the track to the collection 

1751 tc_relationship = model.TrackCollectionRelationshipDB( 

1752 source_id=track_id, 

1753 target_id=collection_id, 

1754 relationship_type="track", 

1755 meta=meta or {}, 

1756 relationship_position=position, 

1757 ) 

1758 session.add(tc_relationship) 

1759 session.commit() 

1760 session.refresh(collection) 

1761 return schema.NendoCollection.model_validate(collection) 

1762 

1763 def get_collection_tracks( 

1764 self, collection_id: uuid.UUID, 

1765 ) -> List[schema.NendoTrack]: 

1766 """Get all tracks from a collection. 

1767 

1768 Args: 

1769 collection_id (uuid.UUID): ID of the collection to get the tracks from. 

1770 

1771 Returns: 

1772 List[schema.NendoTrack]: List of tracks in the collection. 

1773 """ 

1774 with self.session_scope() as session: 

1775 tracks_db = ( 

1776 session.query(model.NendoTrackDB) 

1777 .join( 

1778 model.TrackCollectionRelationshipDB, 

1779 model.TrackCollectionRelationshipDB.source_id 

1780 == model.NendoTrackDB.id, 

1781 ) 

1782 .options(joinedload(model.NendoTrackDB.related_collections)) 

1783 .filter(model.TrackCollectionRelationshipDB.target_id == collection_id) 

1784 .all() 

1785 ) 

1786 

1787 return ( 

1788 [schema.NendoTrack.model_validate(t) for t in tracks_db] 

1789 if tracks_db is not None 

1790 else None 

1791 ) 

1792 

1793 def get_collection( 

1794 self, collection_id: uuid.UUID, details: bool = True, 

1795 ) -> Union[schema.NendoCollection, schema.NendoCollectionSlim]: 

1796 """Get a collection by its ID. 

1797 

1798 Args: 

1799 collection_id (uuid.UUID): ID of the target collection. 

1800 details (bool, optional): Flag that defines whether the result should 

1801 contain all fields or only a subset. Defaults to True. 

1802 

1803 Returns: 

1804 Union[NendoCollection, NendoCollectionSlim]: Collection object, compact 

1805 version if the `details` flag has been set to False. 

1806 """ 

1807 with self.session_scope() as session: 

1808 query = session.query(model.NendoCollectionDB) 

1809 if details: 

1810 query = query.options( 

1811 joinedload(model.NendoCollectionDB.related_tracks).joinedload( 

1812 model.TrackCollectionRelationshipDB.source, 

1813 ), 

1814 ) 

1815 collection_db = query.filter( 

1816 model.NendoCollectionDB.id == collection_id, 

1817 ).first() 

1818 

1819 if details: 

1820 collection = ( 

1821 schema.NendoCollection.model_validate(collection_db) 

1822 if collection_db is not None 

1823 else None 

1824 ) 

1825 else: 

1826 collection = ( 

1827 schema.NendoCollectionSlim.model_validate(collection_db) 

1828 if collection_db is not None 

1829 else None 

1830 ) 

1831 return collection 

1832 

1833 def get_collections( 

1834 self, 

1835 query: Optional[Query] = None, 

1836 user_id: Optional[Union[str, uuid.UUID]] = None, 

1837 order_by: Optional[str] = None, 

1838 order: Optional[str] = "asc", 

1839 limit: Optional[int] = None, 

1840 offset: Optional[int] = None, 

1841 session: Optional[Session] = None, 

1842 ) -> Union[List, Iterator]: 

1843 """Get a list of collections. 

1844 

1845 Args: 

1846 query (Optional[Query]): Query object to build from. 

1847 user_id (Union[str, UUID], optional): The user ID to filter for. 

1848 order_by (Optional[str]): Key used for ordering the results. 

1849 order (Optional[str]): Order in which to retrieve results 

1850 ("asc" or "desc"). 

1851 limit (Optional[int]): Limit the number of returned results. 

1852 offset (Optional[int]): Offset into the paginated results (requires limit). 

1853 session (sqlalchemy.Session): Session object to commit to. 

1854 

1855 Returns: 

1856 Union[List, Iterator]: List or generator of collections, depending on the 

1857 configuration variable stream_mode 

1858 """ 

1859 user_id = self._ensure_user_uuid(user_id) 

1860 with self.session_scope() as session: 

1861 query = session.query(model.NendoCollectionDB).filter( 

1862 model.NendoCollectionDB.user_id == user_id, 

1863 ) 

1864 return self._get_collections_db( 

1865 query, user_id, order_by, order, limit, offset, session, 

1866 ) 

1867 

1868 @schema.NendoPlugin.stream_output 

1869 def _get_collections_db( 

1870 self, 

1871 query: Optional[Query] = None, 

1872 user_id: Optional[Union[str, uuid.UUID]] = None, 

1873 order_by: Optional[str] = None, 

1874 order: Optional[str] = "asc", 

1875 limit: Optional[int] = None, 

1876 offset: Optional[int] = None, 

1877 session: Optional[Session] = None, 

1878 ) -> Union[List, Iterator]: 

1879 """Get a list of collections from the DB.""" 

1880 user_id = self._ensure_user_uuid(user_id) 

1881 s = session or self.session_scope() 

1882 with s as session_local: 

1883 if query: 

1884 query_local = query 

1885 else: 

1886 query_local = session_local.query(model.NendoCollectionDB).filter( 

1887 model.NendoCollectionDB.user_id == user_id, 

1888 ) 

1889 

1890 if order_by: 

1891 if order_by == "random": 

1892 query_local = query_local.order_by(func.random()) 

1893 elif order == "desc": 

1894 query_local = query_local.order_by(desc(order_by)) 

1895 else: 

1896 query_local = query_local.order_by(asc(order_by)) 

1897 

1898 if limit: 

1899 query_local = query_local.limit(limit) 

1900 if offset: 

1901 query_local = query_local.offset(offset) 

1902 

1903 if self.config.stream_chunk_size > 1: 

1904 chunk = [] 

1905 for collection in query: 

1906 chunk.append(schema.NendoCollection.model_validate(collection)) 

1907 if len(chunk) == self.config.stream_chunk_size: 

1908 yield chunk 

1909 chunk = [] 

1910 if chunk: # yield remaining tracks in non-full chunk 

1911 yield chunk 

1912 else: 

1913 for collection in query: 

1914 yield schema.NendoCollection.model_validate(collection) 

1915 

1916 def find_collections( 

1917 self, 

1918 value: str = "", 

1919 user_id: Optional[Union[str, uuid.UUID]] = None, 

1920 order_by: Optional[str] = None, 

1921 order: Optional[str] = "asc", 

1922 limit: Optional[int] = None, 

1923 offset: Optional[int] = None, 

1924 ) -> Union[List, Iterator]: 

1925 """Find collections with a search term in the description or meta field. 

1926 

1927 Args: 

1928 value (str): Term to be searched for in the description and meta field. 

1929 user_id (Union[str, UUID], optional): The user ID to filter for. 

1930 order_by (Optional[str]): Key used for ordering the results. 

1931 order (Optional[str]): Order in which to retrieve results ("asc" or "desc"). 

1932 limit (Optional[int]): Limit the number of returned results. 

1933 offset (Optional[int]): Offset into the paginated results (requires limit). 

1934 

1935 Returns: 

1936 Union[List, Iterator]: List or generator of collections, depending on the 

1937 configuration variable stream_mode 

1938 """ 

1939 user_id = self._ensure_user_uuid(user_id) 

1940 with self.session_scope() as session: 

1941 query = session.query(model.NendoCollectionDB).filter( 

1942 and_( 

1943 or_( 

1944 model.NendoCollectionDB.name.ilike(f"%{value}%"), 

1945 model.NendoCollectionDB.description.ilike(f"%{value}%"), 

1946 # cast( 

1947 # model.NendoCollectionDB.meta, Text()).ilike(f"%{value}%" 

1948 # ), 

1949 ), 

1950 model.NendoCollectionDB.user_id == user_id, 

1951 ) 

1952 ) 

1953 return self._get_collections_db( 

1954 query, user_id, order_by, order, limit, offset, session, 

1955 ) 

1956 

1957 def get_related_collections( 

1958 self, 

1959 collection_id: Union[str, uuid.UUID], 

1960 user_id: Optional[Union[str, uuid.UUID]] = None, 

1961 order_by: Optional[str] = None, 

1962 order: Optional[str] = "asc", 

1963 limit: Optional[int] = None, 

1964 offset: Optional[int] = None, 

1965 ) -> Union[List, Iterator]: 

1966 """Get collections with a relationship to the collection with collection_id. 

1967 

1968 Args: 

1969 collection_id (str): ID of the collection to be searched for. 

1970 user_id (Union[str, UUID], optional): The user ID to filter for. 

1971 order_by (Optional[str]): Key used for ordering the results. 

1972 order (Optional[str]): Order in which to retrieve results ("asc" or "desc"). 

1973 limit (Optional[int]): Limit the number of returned results. 

1974 offset (Optional[int]): Offset into the paginated results (requires limit). 

1975 

1976 Returns: 

1977 Union[List, Iterator]: List or generator of collections, depending on the 

1978 configuration variable stream_mode 

1979 """ 

1980 user_id = self._ensure_user_uuid(user_id) 

1981 with self.session_scope() as session: 

1982 query = self._get_related_collections_query( 

1983 collection_id=ensure_uuid(collection_id), 

1984 session=session, 

1985 user_id=user_id, 

1986 ) 

1987 return self._get_collections_db(query, order_by, order, limit, offset, session) 

1988 

1989 def remove_track_from_collection( 

1990 self, 

1991 track_id: Union[str, uuid.UUID], 

1992 collection_id: Union[str, uuid.UUID], 

1993 ) -> bool: 

1994 """Deletes a relationship from the track to the collection. 

1995 

1996 Args: 

1997 track_id (Union[str, uuid.UUID]): ID 

1998 collection_id (Union[str, uuid.UUID]): Collection id. 

1999 

2000 Returns: 

2001 success (bool): True if removal was successful, False otherwise. 

2002 """ 

2003 with self.session_scope() as session: 

2004 collection_id = ensure_uuid(collection_id) 

2005 track_id = ensure_uuid(track_id) 

2006 

2007 # # Check the collection and track objects 

2008 # collection = ( 

2009 # session.query(model.NendoCollectionDB) 

2010 # .filter_by(id=collection_id) 

2011 # .first() 

2012 # ) 

2013 # track = session.query(model.NendoTrackDB).filter_by(id=track_id).first() 

2014 # if not collection: 

2015 # raise ValueError("The collection does not exist") 

2016 # if not track: 

2017 # raise ValueError("The track does not exist") 

2018 

2019 # Remove the relationship from the track to the collection 

2020 return self._remove_track_from_collection_db( 

2021 track_id=track_id, collection_id=collection_id, session=session, 

2022 ) 

2023 

2024 def update_collection( 

2025 self, 

2026 collection: schema.NendoCollection, 

2027 ) -> schema.NendoCollection: 

2028 """Updates the given collection by storing it to the database. 

2029 

2030 Args: 

2031 collection (NendoCollection): The collection to store. 

2032 

2033 Raises: 

2034 NendoCollectionNotFoundError: If the collection with 

2035 the given ID was not found. 

2036 

2037 Returns: 

2038 NendoCollection: The updated collection. 

2039 """ 

2040 with self.session_scope() as session: 

2041 collection_db = ( 

2042 session.query(model.NendoCollectionDB) 

2043 .filter_by(id=collection.id) 

2044 .first() 

2045 ) 

2046 

2047 if collection_db is None: 

2048 raise schema.NendoCollectionNotFoundError( 

2049 "Collection not found", collection.id, 

2050 ) 

2051 

2052 collection_db.name = collection.name 

2053 collection_db.user_id = collection.user_id 

2054 collection_db.description = collection.description 

2055 collection_db.collection_type = collection.collection_type 

2056 collection_db.visibility = collection.visibility 

2057 collection_db.meta = collection.meta 

2058 

2059 session.commit() 

2060 session.refresh(collection_db) 

2061 return schema.NendoCollection.model_validate(collection_db) 

2062 

2063 def remove_collection( 

2064 self, 

2065 collection_id: uuid.UUID, 

2066 remove_relationships: bool = False, 

2067 ) -> bool: 

2068 """Deletes the collection identified by `collection_id`. 

2069 

2070 Args: 

2071 collection_id (uuid.UUID): ID of the collection to remove. 

2072 remove_relationships (bool, optional): 

2073 If False prevent deletion if related tracks exist, 

2074 if True delete relationships together with the object. 

2075 Defaults to False. 

2076 

2077 Returns: 

2078 bool: True if deletion was successful, False otherwise. 

2079 """ 

2080 with self.session_scope() as session: 

2081 # has_related_track = ( 

2082 # session.query(model.TrackCollectionRelationshipDB) 

2083 # .filter( 

2084 # model.TrackCollectionRelationshipDB.target_id 

2085 # == collection_id 

2086 # ) 

2087 # .first() 

2088 # ) is not False 

2089 has_related_collection = ( 

2090 session.query(model.CollectionCollectionRelationshipDB) 

2091 .filter( 

2092 or_( 

2093 model.CollectionCollectionRelationshipDB.source_id 

2094 == collection_id, 

2095 model.CollectionCollectionRelationshipDB.target_id 

2096 == collection_id, 

2097 ), 

2098 ) 

2099 .first() 

2100 ) is not None 

2101 

2102 if has_related_collection: # or has_related_track: 

2103 if remove_relationships: 

2104 session.query(model.CollectionCollectionRelationshipDB).filter( 

2105 or_( 

2106 model.CollectionCollectionRelationshipDB.target_id 

2107 == collection_id, 

2108 model.CollectionCollectionRelationshipDB.source_id 

2109 == collection_id, 

2110 ), 

2111 ).delete() 

2112 session.commit() 

2113 else: 

2114 logger.warning( 

2115 "Cannot remove due to existing relationships. " 

2116 "Set `remove_relationships=True` to remove them.", 

2117 ) 

2118 return False 

2119 

2120 # clean up track-collection relationships 

2121 session.query(model.TrackCollectionRelationshipDB).filter( 

2122 model.TrackCollectionRelationshipDB.target_id == collection_id, 

2123 ).delete() 

2124 session.commit() 

2125 

2126 # delete actual target 

2127 session.query(model.NendoCollectionDB).filter( 

2128 model.NendoCollectionDB.id == collection_id, 

2129 ).delete() 

2130 

2131 return True 

2132 

2133 def export_collection( 

2134 self, 

2135 collection_id: Union[str, uuid.UUID], 

2136 export_path: str, 

2137 filename_suffix: str = "nendo", 

2138 file_format: str = "wav", 

2139 ) -> List[str]: 

2140 """Export the collection to a directory. 

2141 

2142 Args: 

2143 collection_id (Union[str, uuid.UUID]): The ID of the target 

2144 collection to export. 

2145 export_path (str): Path to a directory into which the collection's tracks 

2146 should be exported. 

2147 filename_suffix (str): The suffix which should be appended to each 

2148 exported track's filename. 

2149 file_format (str, optional): Format of the exported track. Ignored if 

2150 file_path is a full file path. Defaults to "wav". 

2151 

2152 Returns: 

2153 List[str]: A list with all full paths to the exported files. 

2154 """ 

2155 collection_tracks = self.get_collection_tracks(collection_id) 

2156 now = datetime.now().strftime("%Y%m%d%H%M%S") # noqa: DTZ005 

2157 if not os.path.isdir(export_path): 

2158 logger.info( 

2159 f"Export path {export_path} does not exist, creating now.", 

2160 ) 

2161 os.makedirs(export_path, exist_ok=True) 

2162 track_file_paths = [] 

2163 for track in collection_tracks: 

2164 if track.has_meta("original_filename"): 

2165 original_filename = track.get_meta("original_filename") 

2166 else: 

2167 original_filename = track.resource.file_name 

2168 file_name = f"{original_filename}_{filename_suffix}_{now}.{file_format}" 

2169 file_path = os.path.join(export_path, file_name) 

2170 track_file_path = self.export_track( 

2171 track_id = track.id, 

2172 file_path = file_path, 

2173 file_format = file_format, 

2174 ) 

2175 track_file_paths.append(track_file_path) 

2176 return track_file_paths 

2177 

2178 # ========================= 

2179 # 

2180 # BLOB MANAGEMENT FUNCTIONS 

2181 # 

2182 # ========================= 

2183 

2184 def _upsert_blob_db( 

2185 self, blob: schema.NendoBlobBase, session: Session, 

2186 ) -> model.NendoBlobDB: 

2187 """Create blob in DB or update if it exists. 

2188 

2189 Args: 

2190 blob (schema.NendoBlobCreate): Blob object to be created 

2191 session (Session): Session to be used for the transaction 

2192 

2193 Returns: 

2194 model.NendoBlobDB: The ORM model object of the upserted blob 

2195 """ 

2196 if type(blob) == schema.NendoBlobCreate: 

2197 # create new blob 

2198 db_blob = model.NendoBlobDB(**blob.model_dump()) 

2199 session.add(db_blob) 

2200 else: 

2201 # update existing blob 

2202 db_blob = ( 

2203 session.query(model.NendoBlobDB).filter_by(id=blob.id).one_or_none() 

2204 ) 

2205 if db_blob is None: 

2206 raise schema.NendoBlobNotFoundError("Blob not found", target_id=blob.id) 

2207 db_blob.resource = blob.resource.model_dump() 

2208 session.commit() 

2209 return db_blob 

2210 

2211 def _create_blob_from_bytes( 

2212 self, 

2213 data: bytes, 

2214 user_id: Optional[uuid.UUID] = None, 

2215 ) -> schema.NendoBlobCreate: 

2216 """Create a blob from the given bytes.""" 

2217 target_file = None 

2218 try: 

2219 target_file = self.storage_driver.save_bytes( 

2220 file_name=self.storage_driver.generate_filename( 

2221 filetype="pkl", 

2222 user_id=str(user_id) if user_id else str(self.user.id), 

2223 ), 

2224 data=data, 

2225 user_id=str(user_id) if user_id else str(self.user.id), 

2226 ) 

2227 except Exception as e: # noqa: BLE001 

2228 raise schema.NendoLibraryError( 

2229 f"Failed writing file {target_file} to the library. Error: {e}.", 

2230 ) from None 

2231 resource = schema.NendoResource( 

2232 file_path=self.storage_driver.get_file_path( 

2233 src=target_file, 

2234 user_id=str(user_id) if user_id else str(self.user.id), 

2235 ), 

2236 file_name=self.storage_driver.get_file_name( 

2237 src=target_file, 

2238 user_id=str(user_id) if user_id else str(self.user.id), 

2239 ), 

2240 resource_type="blob", 

2241 location=self.storage_driver.get_driver_location(), 

2242 meta={}, 

2243 ) 

2244 return schema.NendoBlobCreate(resource=resource.model_dump(), user_id=self.user.id) 

2245 

2246 def _create_blob_from_file( 

2247 self, 

2248 file_path: FilePath, 

2249 copy_to_library: Optional[bool] = None, 

2250 user_id: Optional[uuid.UUID] = None, 

2251 ) -> schema.NendoBlobCreate: 

2252 """Create a blob from a given filepath.""" 

2253 target_file = None 

2254 if not os.path.isfile(file_path): 

2255 raise schema.NendoResourceError("File not found", file_path) 

2256 

2257 copy_to_library = copy_to_library or self.config.copy_to_library 

2258 meta = {} 

2259 meta.update({"checksum": md5sum(file_path)}) 

2260 if copy_to_library: 

2261 try: 

2262 file_stats = os.stat(file_path) 

2263 meta.update( 

2264 { 

2265 "original_filename": os.path.basename(file_path), 

2266 "original_filepath": os.path.dirname(file_path), 

2267 "original_size": file_stats.st_size, 

2268 "original_checksum": md5sum(file_path), 

2269 }, 

2270 ) 

2271 

2272 target_file = self.storage_driver.save_file( 

2273 file_name=self.storage_driver.generate_filename( 

2274 filetype=os.path.splitext(file_path)[1][1:], # without the dot 

2275 user_id=user_id, 

2276 ), 

2277 file_path=file_path, 

2278 user_id=str(user_id) if user_id else str(self.user.id), 

2279 ) 

2280 location = self.storage_driver.get_driver_location() 

2281 except Exception as e: # noqa: BLE001 

2282 raise schema.NendoLibraryError( 

2283 f"Failed storing blob {target_file}. Error: {e}.", 

2284 ) from None 

2285 else: 

2286 target_file = file_path 

2287 location = schema.ResourceLocation.original 

2288 

2289 resource = schema.NendoResource( 

2290 file_path=os.path.dirname(target_file), 

2291 file_name=os.path.basename(target_file), 

2292 location=location, 

2293 meta=meta or {}, 

2294 ) 

2295 return schema.NendoBlobCreate(resource=resource.model_dump(), user_id=self.user.id) 

2296 

2297 def load_blob( 

2298 self, blob_id: uuid.UUID, user_id: Optional[Union[str, uuid.UUID]] = None, 

2299 ) -> schema.NendoBlob: 

2300 """Loads a blob of data into memory. 

2301 

2302 Args: 

2303 blob_id (uuid.UUID): The UUID of the blob. 

2304 user_id (Optional[Union[str, uuid.UUID]], optional): ID of the user 

2305 who's loading the blob. 

2306 

2307 Returns: 

2308 schema.NendoBlob: The loaded blob. 

2309 """ 

2310 user_id = self._ensure_user_uuid(user_id) 

2311 with self.session_scope() as session: 

2312 blob_db = ( 

2313 session.query(model.NendoBlobDB) 

2314 .filter(model.NendoBlobDB.id == blob_id) 

2315 .one_or_none() 

2316 ) 

2317 if blob_db is not None: 

2318 blob = schema.NendoBlob.model_validate(blob_db) 

2319 local_blob = self.storage_driver.as_local( 

2320 file_path=blob.resource.src, 

2321 location=blob.resource.location, 

2322 user_id=user_id, 

2323 ) 

2324 

2325 # load blob data into memory 

2326 if os.path.splitext(local_blob)[1] == ".pkl": 

2327 with open(local_blob, "rb") as f: 

2328 blob.data = pickle.load(f) # noqa: S301 

2329 elif os.path.splitext(local_blob)[1] == ".npy": 

2330 blob.data = np.load(local_blob) 

2331 elif os.path.splitext(local_blob)[1] == ".wav": 

2332 librosa.load(local_blob, mono=False) 

2333 else: 

2334 logger.error( 

2335 "Blob file format not recognized. " 

2336 "Returning blob with empty data.", 

2337 ) 

2338 

2339 return blob 

2340 

2341 def store_blob( 

2342 self, file_path: Union[str, FilePath], user_id: Optional[Union[str, uuid.UUID]] = None 

2343 ) -> schema.NendoBlob: 

2344 """Stores a blob of data. 

2345 

2346 Args: 

2347 file_path (Union[str, FilePath]): The blob to store. 

2348 user_id (Optional[Union[str, uuid.UUID]], optional): ID of the user 

2349 who's storing the file to blob. 

2350 

2351 Returns: 

2352 schema.NendoBlob: The stored blob. 

2353 """ 

2354 user_id = self._ensure_user_uuid(user_id) 

2355 blob = self._create_blob_from_file(file_path=file_path, user_id=user_id) 

2356 if blob is not None: 

2357 with self.session_scope() as session: 

2358 db_blob = self._upsert_blob_db(blob, session) 

2359 blob = schema.NendoBlob.model_validate(db_blob) 

2360 return blob 

2361 

2362 def store_blob_from_bytes( 

2363 self, 

2364 data: bytes, 

2365 user_id: Optional[Union[str, uuid.UUID]] = None, 

2366 ) -> schema.NendoBlob: 

2367 """Stores a data of type `bytes` to a blob. 

2368 

2369 Args: 

2370 data (bytes): The blob to store. 

2371 user_id (Optional[Union[str, uuid.UUID]], optional): ID of the user 

2372 who's storing the bytes to blob. 

2373 

2374 Returns: 

2375 schema.NendoBlob: The stored blob. 

2376 """ 

2377 user_id = self._ensure_user_uuid(user_id) 

2378 blob = self._create_blob_from_bytes(data=data, user_id=user_id) 

2379 if blob is not None: 

2380 with self.session_scope() as session: 

2381 db_blob = self._upsert_blob_db(blob, session) 

2382 blob = schema.NendoBlob.model_validate(db_blob) 

2383 return blob 

2384 

2385 def remove_blob( 

2386 self, 

2387 blob_id: uuid.UUID, 

2388 remove_resources: bool = True, 

2389 user_id: Optional[Union[str, uuid.UUID]] = None, 

2390 ) -> bool: 

2391 """Deletes a blob of data. 

2392 

2393 Args: 

2394 blob_id (uuid.UUID): The UUID of the blob. 

2395 remove_resources (bool): If True, remove associated resources. 

2396 user_id (Optional[Union[str, uuid.UUID]], optional): ID of the user 

2397 who's removing the blob. 

2398 

2399 Returns: 

2400 success (bool): True if removal was successful, False otherwise 

2401 """ 

2402 user_id = self._ensure_user_uuid(user_id) 

2403 with self.session_scope() as session: 

2404 target = ( 

2405 session.query(model.NendoBlobDB) 

2406 .filter(model.NendoBlobDB.id == blob_id) 

2407 .first() 

2408 ) 

2409 session.delete(target) 

2410 if remove_resources: 

2411 logger.info("Removing resources associated with Blob %s", str(blob_id)) 

2412 try: 

2413 self.storage_driver.remove_file( 

2414 file_name=target.resource["file_name"], 

2415 user_id=user_id, 

2416 ) 

2417 except Exception as e: # noqa: BLE001 

2418 logger.error("Removing %s failed: %s", target.resource.model_dump().src, e) 

2419 return True 

2420 

2421 # ================================== 

2422 # 

2423 # MISCELLANEOUS MANAGEMENT FUNCTIONS 

2424 # 

2425 # ================================== 

2426 

2427 def reset( 

2428 self, force: bool = False, user_id: Optional[Union[str, uuid.UUID]] = None, 

2429 ) -> None: 

2430 """Reset the nendo library. 

2431 

2432 Erase all tracks, collections and relationships. Ask before erasing everything. 

2433 

2434 Args: 

2435 force (bool, optional): Flag that specifies whether to ask the user for 

2436 confirmation of the operation. Default is to ask the user. 

2437 user_id (Optional[Union[str, uuid.UUID]], optional): ID of the user 

2438 who's resetting the library. If none is given, the configured 

2439 nendo default user will be used. 

2440 """ 

2441 user_id = self._ensure_user_uuid(user_id) 

2442 should_proceed = ( 

2443 force 

2444 or input( 

2445 "Are you sure you want to reset the library? " 

2446 "This will purge ALL tracks, collections and relationships!" 

2447 "Enter 'y' to confirm: ", 

2448 ).lower() 

2449 == "y" 

2450 ) 

2451 

2452 if not should_proceed: 

2453 logger.info("Reset operation cancelled.") 

2454 return 

2455 

2456 logger.info("Resetting nendo library.") 

2457 with self.session_scope() as session: 

2458 # delete all relationships 

2459 session.query(model.TrackTrackRelationshipDB).delete() 

2460 session.query(model.TrackCollectionRelationshipDB).delete() 

2461 session.query(model.CollectionCollectionRelationshipDB).delete() 

2462 # delete all plugin data 

2463 session.query(model.NendoPluginDataDB).delete() 

2464 session.commit() 

2465 # delete all collections 

2466 session.query(model.NendoCollectionDB).delete() 

2467 # delete all tracks 

2468 session.query(model.NendoTrackDB).delete() 

2469 # remove files 

2470 for library_file in self.storage_driver.list_files(user_id=user_id): 

2471 self.storage_driver.remove_file(file_name=library_file, user_id=user_id)