Database(url=None, *, force_rollback=None, config=None, full_isolation=None, poll_interval=None, **options)
An abstraction on the top of the EncodeORM databases.Database object.
This object allows to pass also a configuration dictionary in the format of
DATABASEZ_CONFIG = {
"connection": {
"credentials": {
"scheme": 'sqlite', "postgres"...
"host": ...,
"port": ...,
"user": ...,
"password": ...,
"database": ...,
"options": {
"driver": ...
"ssl": ...
}
}
}
}
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271 | def __init__(
self,
url: str | DatabaseURL | URL | Database | None = None,
*,
force_rollback: bool | None = None,
config: DictAny | None = None,
full_isolation: bool | None = None,
# for
poll_interval: float | None = None,
**options: Any,
):
init()
assert config is None or url is None, "Use either 'url' or 'config', not both."
if isinstance(url, Database):
assert not options, "Cannot specify options when copying a Database object."
self.backend = url.backend.__copy__()
self.url = url.url
self.options = url.options
self._call_hooks = url._call_hooks
if poll_interval is None:
poll_interval = url.poll_interval
if force_rollback is None:
force_rollback = bool(url.force_rollback)
if full_isolation is None:
full_isolation = bool(url._full_isolation)
else:
url = DatabaseURL(url)
if config and "connection" in config:
connection_config = config["connection"]
if "credentials" in connection_config:
connection_config = connection_config["credentials"]
url = url.replace(**connection_config)
self.backend, self.url, self.options = self.apply_database_url_and_options(
url, **options
)
if force_rollback is None:
force_rollback = False
if full_isolation is None:
full_isolation = False
if poll_interval is None:
poll_interval = DATABASEZ_POLL_INTERVAL
self.poll_interval = poll_interval
self._full_isolation = full_isolation
self._force_rollback = ForceRollback(force_rollback)
self.backend.owner = self
self._connection_map = weakref.WeakKeyDictionary()
self._databases_map = {}
# When `force_rollback=True` is used, we use a single global
# connection, within a transaction that always rolls back.
self._global_connection: Connection | None = None
self.ref_counter: int = 0
if sys.version_info >= (3, 10):
self.ref_lock: asyncio.Lock = asyncio.Lock()
else:
self.ref_lock = cast(asyncio.Lock, None)
|
_connection_map
instance-attribute
_connection_map = WeakKeyDictionary()
_databases_map
instance-attribute
_loop
class-attribute
instance-attribute
backend
instance-attribute
options
instance-attribute
is_connected
class-attribute
instance-attribute
_call_hooks
class-attribute
instance-attribute
_remove_global_connection
class-attribute
instance-attribute
_remove_global_connection = True
_full_isolation
class-attribute
instance-attribute
_full_isolation = full_isolation
poll_interval
instance-attribute
poll_interval = poll_interval
_force_rollback
instance-attribute
_force_rollback = ForceRollback(force_rollback)
force_rollback
class-attribute
instance-attribute
force_rollback = ForceRollbackDescriptor()
async_helper
class-attribute
instance-attribute
async_helper = AsyncHelperDatabase
_global_connection
instance-attribute
_global_connection = None
ref_counter
instance-attribute
ref_lock
instance-attribute
_connection
property
writable
__copy__
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
| def __copy__(self) -> Database:
return self.__class__(self)
|
inc_refcount
async
Internal method to bump the ref_count.
Return True if ref_count is 0, False otherwise.
Should not be used outside of tests. Use connect and hooks instead.
Not multithreading safe!
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312 | async def inc_refcount(self) -> bool:
"""
Internal method to bump the ref_count.
Return True if ref_count is 0, False otherwise.
Should not be used outside of tests. Use connect and hooks instead.
Not multithreading safe!
"""
async with self.ref_lock:
self.ref_counter += 1
# on the first call is count is 1 because of the former +1
if self.ref_counter == 1:
return True
return False
|
decr_refcount
async
Internal method to decrease the ref_count.
Return True if ref_count drops to 0, False otherwise.
Should not be used outside of tests. Use disconnect and hooks instead.
Not multithreading safe!
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328 | async def decr_refcount(self) -> bool:
"""
Internal method to decrease the ref_count.
Return True if ref_count drops to 0, False otherwise.
Should not be used outside of tests. Use disconnect and hooks instead.
Not multithreading safe!
"""
async with self.ref_lock:
self.ref_counter -= 1
# on the last call, the count is 0
if self.ref_counter == 0:
return True
return False
|
connect_hook
async
Refcount protected connect hook. Executed begore engine and global connection setup.
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
| async def connect_hook(self) -> None:
"""Refcount protected connect hook. Executed begore engine and global connection setup."""
|
connect
async
Establish the connection pool.
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380 | async def connect(self) -> bool:
"""
Establish the connection pool.
"""
# py39 compatibility
if cast(Any, self.ref_lock) is None:
self.ref_lock = asyncio.Lock()
loop = asyncio.get_running_loop()
if self._loop is not None and loop != self._loop:
if self.poll_interval < 0:
raise RuntimeError("Subdatabases and polling are disabled")
# copy when not in map
if loop not in self._databases_map:
assert (
self._global_connection is not None
), "global connection should have been set"
# correctly initialize force_rollback with parent value
database = self.__class__(
self, force_rollback=bool(self.force_rollback), full_isolation=False
)
# prevent side effects of connect_hook
database._call_hooks = False
database._global_connection = self._global_connection
self._databases_map[loop] = database
# forward call
return await self._databases_map[loop].connect()
if not await self.inc_refcount():
assert self.is_connected, "ref_count < 0"
return False
if self._call_hooks:
try:
await self.connect_hook()
except BaseException as exc:
await self.decr_refcount()
raise exc
self._loop = asyncio.get_event_loop()
await self.backend.connect(self.url, **self.options)
logger.info("Connected to database %s", self.url.obscure_password, extra=CONNECT_EXTRA)
self.is_connected = True
if self._global_connection is None:
connection = Connection(self, force_rollback=True, full_isolation=self._full_isolation)
self._global_connection = connection
else:
self._remove_global_connection = False
return True
|
disconnect_hook
async
Refcount protected disconnect hook. Executed after connection, engine cleanup.
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
| async def disconnect_hook(self) -> None:
"""Refcount protected disconnect hook. Executed after connection, engine cleanup."""
|
disconnect
async
disconnect(force=False, *, parent_database=None)
Close all connections in the connection pool.
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430 | @multiloop_protector(True, inject_parent=True)
async def disconnect(
self, force: bool = False, *, parent_database: Database | None = None
) -> bool:
"""
Close all connections in the connection pool.
"""
# parent_database is injected and should not be specified manually
if not await self.decr_refcount() or force:
if not self.is_connected:
logger.debug("Already disconnected, skipping disconnection")
return False
if force:
logger.warning("Force disconnect, despite refcount not 0")
else:
return False
if parent_database is not None:
loop = asyncio.get_running_loop()
del parent_database._databases_map[loop]
if force and self._databases_map:
assert not self._databases_map, "sub databases still active, force terminate them"
for sub_database in self._databases_map.values():
await arun_coroutine_threadsafe(
sub_database.disconnect(True), sub_database._loop, self.poll_interval
)
self._databases_map = {}
assert not self._databases_map, "sub databases still active"
try:
assert self._global_connection is not None
if self._remove_global_connection:
await self._global_connection.__aexit__()
self._global_connection = None
self._connection = None
finally:
logger.info(
"Disconnected from database %s",
self.url.obscure_password,
extra=DISCONNECT_EXTRA,
)
self.is_connected = False
await self.backend.disconnect()
self._loop = None
if self._call_hooks:
await self.disconnect_hook()
return True
|
fetch_all
async
fetch_all(query, values=None, timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
447
448
449
450
451
452
453
454 | async def fetch_all(
self,
query: ClauseElement | str,
values: dict | None = None,
timeout: float | None = None,
) -> list[interfaces.Record]:
async with self.connection() as connection:
return await connection.fetch_all(query, values, timeout=timeout)
|
fetch_one
async
fetch_one(query, values=None, pos=0, timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
456
457
458
459
460
461
462
463
464 | async def fetch_one(
self,
query: ClauseElement | str,
values: dict | None = None,
pos: int = 0,
timeout: float | None = None,
) -> interfaces.Record | None:
async with self.connection() as connection:
return await connection.fetch_one(query, values, pos=pos, timeout=timeout)
|
fetch_val
async
fetch_val(query, values=None, column=0, pos=0, timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481 | async def fetch_val(
self,
query: ClauseElement | str,
values: dict | None = None,
column: Any = 0,
pos: int = 0,
timeout: float | None = None,
) -> Any:
async with self.connection() as connection:
return await connection.fetch_val(
query,
values,
column=column,
pos=pos,
timeout=timeout,
)
|
execute
async
execute(query, values=None, timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
483
484
485
486
487
488
489
490 | async def execute(
self,
query: ClauseElement | str,
values: Any = None,
timeout: float | None = None,
) -> interfaces.Record | int:
async with self.connection() as connection:
return await connection.execute(query, values, timeout=timeout)
|
execute_many
async
execute_many(query, values=None, timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
492
493
494
495
496
497
498
499 | async def execute_many(
self,
query: ClauseElement | str,
values: Any = None,
timeout: float | None = None,
) -> Sequence[interfaces.Record] | int:
async with self.connection() as connection:
return await connection.execute_many(query, values, timeout=timeout)
|
iterate
async
iterate(query, values=None, chunk_size=None, timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
501
502
503
504
505
506
507
508
509
510 | async def iterate(
self,
query: ClauseElement | str,
values: dict | None = None,
chunk_size: int | None = None,
timeout: float | None = None,
) -> AsyncGenerator[interfaces.Record, None]:
async with self.connection() as connection:
async for record in connection.iterate(query, values, chunk_size, timeout=timeout):
yield record
|
batched_iterate
async
batched_iterate(query, values=None, batch_size=None, batch_wrapper=tuple, timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531 | async def batched_iterate(
self,
query: ClauseElement | str,
values: dict | None = None,
batch_size: int | None = None,
batch_wrapper: BatchCallable = tuple,
timeout: float | None = None,
) -> AsyncGenerator[BatchCallableResult, None]:
async with self.connection() as connection:
async for batch in cast(
AsyncGenerator["BatchCallableResult", None],
connection.batched_iterate(
query,
values,
batch_wrapper=batch_wrapper,
batch_size=batch_size,
timeout=timeout,
),
):
yield batch
|
transaction
transaction(*, force_rollback=False, **kwargs)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
| def transaction(self, *, force_rollback: bool = False, **kwargs: Any) -> Transaction:
return Transaction(self.connection, force_rollback=force_rollback, **kwargs)
|
run_sync
async
run_sync(fn, *args, timeout=None, **kwargs)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
536
537
538
539
540
541
542
543
544 | async def run_sync(
self,
fn: Callable[..., Any],
*args: Any,
timeout: float | None = None,
**kwargs: Any,
) -> Any:
async with self.connection() as connection:
return await connection.run_sync(fn, *args, **kwargs, timeout=timeout)
|
create_all
async
create_all(meta, timeout=None, **kwargs)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
| async def create_all(
self, meta: MetaData, timeout: float | None = None, **kwargs: Any
) -> None:
async with self.connection() as connection:
await connection.create_all(meta, **kwargs, timeout=timeout)
|
drop_all
async
drop_all(meta, timeout=None, **kwargs)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
| async def drop_all(self, meta: MetaData, timeout: float | None = None, **kwargs: Any) -> None:
async with self.connection() as connection:
await connection.drop_all(meta, **kwargs, timeout=timeout)
|
_non_global_connection
_non_global_connection(timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
556
557
558
559
560
561
562
563
564 | @multiloop_protector(False)
def _non_global_connection(
self,
timeout: float | None = None, # stub for type checker, multiloop_protector handles timeout
) -> Connection:
if self._connection is None:
_connection = self._connection = Connection(self)
return _connection
return self._connection
|
connection
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
| def connection(self, timeout: float | None = None) -> Connection:
if not self.is_connected:
raise RuntimeError("Database is not connected")
if self.force_rollback:
return cast(Connection, self._global_connection)
return self._non_global_connection(timeout=timeout)
|
asgi
asgi(app: None, handle_lifespan: bool = False) -> Callable[[ASGIApp], ASGIHelper]
asgi(app: ASGIApp, handle_lifespan: bool = False) -> ASGIHelper
asgi(app=None, handle_lifespan=False)
Return wrapper for asgi integration.
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
592
593
594
595
596
597
598
599
600 | def asgi(
self,
app: ASGIApp | None = None,
handle_lifespan: bool = False,
) -> ASGIHelper | Callable[[ASGIApp], ASGIHelper]:
"""Return wrapper for asgi integration."""
if app is not None:
return ASGIHelper(app=app, database=self, handle_lifespan=handle_lifespan)
return partial(ASGIHelper, database=self, handle_lifespan=handle_lifespan)
|
get_backends
classmethod
get_backends(scheme='', *, overwrite_paths=['databasez.overwrites'], database_name='Database', connection_name='Connection', transaction_name='Transaction', database_class=None, connection_class=None, transaction_class=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651 | @classmethod
def get_backends(
cls,
# let scheme empty for direct imports
scheme: str = "",
*,
overwrite_paths: Sequence[str] = ["databasez.overwrites"],
database_name: str = "Database",
connection_name: str = "Connection",
transaction_name: str = "Transaction",
database_class: type[interfaces.DatabaseBackend] | None = None,
connection_class: type[interfaces.ConnectionBackend] | None = None,
transaction_class: type[interfaces.TransactionBackend] | None = None,
) -> tuple[
type[interfaces.DatabaseBackend],
type[interfaces.ConnectionBackend],
type[interfaces.TransactionBackend],
]:
module: Any = None
for overwrite_path in overwrite_paths:
imp_path = f"{overwrite_path}.{scheme.replace('+', '_')}" if scheme else overwrite_path
try:
module = importlib.import_module(imp_path)
except ImportError as exc:
logging.debug(
f'Import of "{imp_path}" failed. This is not an error.', exc_info=exc
)
if "+" in scheme:
imp_path = f"{overwrite_path}.{scheme.split('+', 1)[0]}"
try:
module = importlib.import_module(imp_path)
except ImportError as exc:
logging.debug(
f'Import of "{imp_path}" failed. This is not an error.', exc_info=exc
)
if module is not None:
break
database_class = getattr(module, database_name, database_class)
assert database_class is not None and issubclass(
database_class, interfaces.DatabaseBackend
)
connection_class = getattr(module, connection_name, connection_class)
assert connection_class is not None and issubclass(
connection_class, interfaces.ConnectionBackend
)
transaction_class = getattr(module, transaction_name, transaction_class)
assert transaction_class is not None and issubclass(
transaction_class, interfaces.TransactionBackend
)
return database_class, connection_class, transaction_class
|
apply_database_url_and_options
classmethod
apply_database_url_and_options(url, *, overwrite_paths=['databasez.overwrites'], **options)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677 | @classmethod
def apply_database_url_and_options(
cls,
url: DatabaseURL | str,
*,
overwrite_paths: Sequence[str] = ["databasez.overwrites"],
**options: Any,
) -> tuple[interfaces.DatabaseBackend, DatabaseURL, dict[str, Any]]:
url = DatabaseURL(url)
database_class, connection_class, transaction_class = cls.get_backends(
url.scheme,
database_class=default_database,
connection_class=default_connection,
transaction_class=default_transaction,
overwrite_paths=overwrite_paths,
)
backend = database_class(
connection_class=connection_class, transaction_class=transaction_class
)
url, options = backend.extract_options(url, **options)
# check against transformed url
assert url.sqla_url.get_dialect(True).is_async, f'Dialect: "{url.scheme}" is not async.'
return backend, url, options
|