Skip to content

Database class

edgy.Database

Database(url=None, *, force_rollback=None, config=None, full_isolation=None, poll_interval=None, **options)

An abstraction on the top of the EncodeORM databases.Database object.

This object allows to pass also a configuration dictionary in the format of

DATABASEZ_CONFIG = { "connection": { "credentials": { "scheme": 'sqlite', "postgres"... "host": ..., "port": ..., "user": ..., "password": ..., "database": ..., "options": { "driver": ... "ssl": ... } } } }

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def __init__(
    self,
    url: str | DatabaseURL | URL | Database | None = None,
    *,
    force_rollback: bool | None = None,
    config: DictAny | None = None,
    full_isolation: bool | None = None,
    # for custom poll intervals
    poll_interval: float | None = None,
    **options: Any,
):
    init()
    assert config is None or url is None, "Use either 'url' or 'config', not both."
    if isinstance(url, Database):
        assert not options, "Cannot specify options when copying a Database object."
        self.backend = url.backend.__copy__()
        self.url = url.url
        self.options = url.options
        self._call_hooks = url._call_hooks
        if poll_interval is None:
            poll_interval = url.poll_interval
        if force_rollback is None:
            force_rollback = bool(url.force_rollback)
        if full_isolation is None:
            full_isolation = bool(url._full_isolation)
    else:
        url = DatabaseURL(url)
        if config and "connection" in config:
            connection_config = config["connection"]
            if "credentials" in connection_config:
                connection_config = connection_config["credentials"]
                url = url.replace(**connection_config)
        self.backend, self.url, self.options = self.apply_database_url_and_options(
            url, **options
        )
        if force_rollback is None:
            force_rollback = False
        if full_isolation is None:
            full_isolation = False
        if poll_interval is None:
            # when not using utils...., the constant cannot be changed at runtime
            poll_interval = utils.DATABASEZ_POLL_INTERVAL
    self.poll_interval = poll_interval
    self._full_isolation = full_isolation
    self._force_rollback = ForceRollback(force_rollback)
    self.backend.owner = self
    self._connection_map = weakref.WeakKeyDictionary()
    self._databases_map = {}

    # When `force_rollback=True` is used, we use a single global
    # connection, within a transaction that always rolls back.
    self._global_connection: Connection | None = None

    self.ref_counter: int = 0
    self.ref_lock: asyncio.Lock = asyncio.Lock()

_connection_map instance-attribute

_connection_map = WeakKeyDictionary()

_databases_map instance-attribute

_databases_map = {}

_loop class-attribute instance-attribute

_loop = None

backend instance-attribute

backend

url instance-attribute

url

options instance-attribute

options

is_connected class-attribute instance-attribute

is_connected = False

_call_hooks class-attribute instance-attribute

_call_hooks = True

_remove_global_connection class-attribute instance-attribute

_remove_global_connection = True

_full_isolation class-attribute instance-attribute

_full_isolation = full_isolation

poll_interval instance-attribute

poll_interval = poll_interval

_force_rollback instance-attribute

_force_rollback = ForceRollback(force_rollback)

force_rollback class-attribute instance-attribute

force_rollback = ForceRollbackDescriptor()

async_helper class-attribute instance-attribute

async_helper = AsyncHelperDatabase

_global_connection instance-attribute

_global_connection = None

ref_counter instance-attribute

ref_counter = 0

ref_lock instance-attribute

ref_lock = Lock()

_current_task property

_current_task

_connection property writable

_connection

engine property

engine

__copy__

__copy__()
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
260
261
def __copy__(self) -> Database:
    return self.__class__(self)

inc_refcount async

inc_refcount()

Internal method to bump the ref_count.

Return True if ref_count is 0, False otherwise.

Should not be used outside of tests. Use connect and hooks instead. Not multithreading safe!

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
async def inc_refcount(self) -> bool:
    """
    Internal method to bump the ref_count.

    Return True if ref_count is 0, False otherwise.

    Should not be used outside of tests. Use connect and hooks instead.
    Not multithreading safe!
    """
    async with self.ref_lock:
        self.ref_counter += 1
        # on the first call is count is 1 because of the former +1
        if self.ref_counter == 1:
            return True
    return False

decr_refcount async

decr_refcount()

Internal method to decrease the ref_count.

Return True if ref_count drops to 0, False otherwise.

Should not be used outside of tests. Use disconnect and hooks instead. Not multithreading safe!

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
async def decr_refcount(self) -> bool:
    """
    Internal method to decrease the ref_count.

    Return True if ref_count drops to 0, False otherwise.

    Should not be used outside of tests. Use disconnect and hooks instead.
    Not multithreading safe!
    """
    async with self.ref_lock:
        self.ref_counter -= 1
        # on the last call, the count is 0
        if self.ref_counter == 0:
            return True
    return False

connect_hook async

connect_hook()

Refcount protected connect hook. Executed before engine and global connection setup.

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
317
318
async def connect_hook(self) -> None:
    """Refcount protected connect hook. Executed before engine and global connection setup."""

connect async

connect()

Establish the connection pool.

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
async def connect(self) -> bool:
    """
    Establish the connection pool.
    """
    loop = asyncio.get_running_loop()
    if self._loop is not None and loop != self._loop:
        if self.poll_interval < 0:
            raise RuntimeError("Subdatabases and polling are disabled")
        # copy when not in map
        if loop not in self._databases_map:
            assert self._global_connection is not None, (
                "global connection should have been set"
            )
            # correctly initialize force_rollback with parent value
            database = self.__class__(
                self, force_rollback=bool(self.force_rollback), full_isolation=False
            )
            # prevent side effects of connect_hook
            database._call_hooks = False
            database._global_connection = self._global_connection
            self._databases_map[loop] = database
        # forward call
        return await self._databases_map[loop].connect()

    if not await self.inc_refcount():
        assert self.is_connected, "ref_count < 0"
        return False
    if self._call_hooks:
        try:
            await self.connect_hook()
        except BaseException as exc:
            await self.decr_refcount()
            raise exc
    self._loop = asyncio.get_event_loop()

    await self.backend.connect(self.url, **self.options)
    self.is_connected = True

    if self._global_connection is None:
        connection = Connection(self, force_rollback=True, full_isolation=self._full_isolation)
        self._global_connection = connection
    else:
        self._remove_global_connection = False
    return True

disconnect_hook async

disconnect_hook()

Refcount protected disconnect hook. Executed after connection, engine cleanup.

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
365
366
async def disconnect_hook(self) -> None:
    """Refcount protected disconnect hook. Executed after connection, engine cleanup."""

disconnect async

disconnect(force=False, *, parent_database=None)

Close all connections in the connection pool.

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
@multiloop_protector(True, inject_parent=True)
async def disconnect(
    self, force: bool = False, *, parent_database: Database | None = None
) -> bool:
    """
    Close all connections in the connection pool.
    """
    # parent_database is injected and should not be specified manually
    if not await self.decr_refcount() or force:
        if not self.is_connected:
            logger.debug("Already disconnected, skip disconnecting")
            return False
        if force:
            logger.warning("Force disconnect, despite refcount not 0")
        else:
            return False
    if parent_database is not None:
        loop = asyncio.get_running_loop()
        del parent_database._databases_map[loop]
    if force and self._databases_map:
        assert not self._databases_map, "sub databases still active, force terminate them"
        for sub_database in self._databases_map.values():
            await arun_coroutine_threadsafe(
                sub_database.disconnect(True),
                sub_database._loop,
                self.poll_interval,
            )
        self._databases_map = {}
    assert not self._databases_map, "sub databases still active"

    try:
        assert self._global_connection is not None
        if self._remove_global_connection:
            await self._global_connection.__aexit__()
            self._global_connection = None
        self._connection = None
    finally:
        self.is_connected = False
        await self.backend.disconnect()
        self._loop = None
        if self._call_hooks:
            await self.disconnect_hook()
    return True

fetch_all async

fetch_all(query, values=None, timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
427
428
429
430
431
432
433
434
async def fetch_all(
    self,
    query: ClauseElement | str,
    values: dict | None = None,
    timeout: float | None = None,
) -> list[interfaces.Record]:
    async with self.connection() as connection:
        return await connection.fetch_all(query, values, timeout=timeout)

fetch_one async

fetch_one(query, values=None, pos=0, timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
436
437
438
439
440
441
442
443
444
async def fetch_one(
    self,
    query: ClauseElement | str,
    values: dict | None = None,
    pos: int = 0,
    timeout: float | None = None,
) -> interfaces.Record | None:
    async with self.connection() as connection:
        return await connection.fetch_one(query, values, pos=pos, timeout=timeout)

fetch_val async

fetch_val(query, values=None, column=0, pos=0, timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
async def fetch_val(
    self,
    query: ClauseElement | str,
    values: dict | None = None,
    column: Any = 0,
    pos: int = 0,
    timeout: float | None = None,
) -> Any:
    async with self.connection() as connection:
        return await connection.fetch_val(
            query,
            values,
            column=column,
            pos=pos,
            timeout=timeout,
        )

execute async

execute(query, values=None, timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
463
464
465
466
467
468
469
470
async def execute(
    self,
    query: ClauseElement | str,
    values: Any = None,
    timeout: float | None = None,
) -> interfaces.Record | int:
    async with self.connection() as connection:
        return await connection.execute(query, values, timeout=timeout)

execute_many async

execute_many(query, values=None, timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
472
473
474
475
476
477
478
479
async def execute_many(
    self,
    query: ClauseElement | str,
    values: Any = None,
    timeout: float | None = None,
) -> Sequence[interfaces.Record] | int:
    async with self.connection() as connection:
        return await connection.execute_many(query, values, timeout=timeout)

iterate async

iterate(query, values=None, chunk_size=None, timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
481
482
483
484
485
486
487
488
489
490
async def iterate(
    self,
    query: ClauseElement | str,
    values: dict | None = None,
    chunk_size: int | None = None,
    timeout: float | None = None,
) -> AsyncGenerator[interfaces.Record, None]:
    async with self.connection() as connection:
        async for record in connection.iterate(query, values, chunk_size, timeout=timeout):
            yield record

batched_iterate async

batched_iterate(query, values=None, batch_size=None, batch_wrapper=tuple, timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
async def batched_iterate(
    self,
    query: ClauseElement | str,
    values: dict | None = None,
    batch_size: int | None = None,
    batch_wrapper: BatchCallable = tuple,
    timeout: float | None = None,
) -> AsyncGenerator[BatchCallableResult, None]:
    async with self.connection() as connection:
        async for batch in cast(
            AsyncGenerator["BatchCallableResult", None],
            connection.batched_iterate(
                query,
                values,
                batch_wrapper=batch_wrapper,
                batch_size=batch_size,
                timeout=timeout,
            ),
        ):
            yield batch

transaction

transaction(*, force_rollback=False, **kwargs)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
513
514
def transaction(self, *, force_rollback: bool = False, **kwargs: Any) -> Transaction:
    return Transaction(self.connection, force_rollback=force_rollback, **kwargs)

run_sync async

run_sync(fn, *args, timeout=None, **kwargs)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
516
517
518
519
520
521
522
523
524
async def run_sync(
    self,
    fn: Callable[..., Any],
    *args: Any,
    timeout: float | None = None,
    **kwargs: Any,
) -> Any:
    async with self.connection() as connection:
        return await connection.run_sync(fn, *args, **kwargs, timeout=timeout)

create_all async

create_all(meta, timeout=None, **kwargs)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
526
527
528
529
530
async def create_all(
    self, meta: MetaData, timeout: float | None = None, **kwargs: Any
) -> None:
    async with self.connection() as connection:
        await connection.create_all(meta, **kwargs, timeout=timeout)

drop_all async

drop_all(meta, timeout=None, **kwargs)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
532
533
534
async def drop_all(self, meta: MetaData, timeout: float | None = None, **kwargs: Any) -> None:
    async with self.connection() as connection:
        await connection.drop_all(meta, **kwargs, timeout=timeout)

_non_global_connection

_non_global_connection(timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
536
537
538
539
540
541
542
543
544
545
546
@multiloop_protector(False)
def _non_global_connection(
    self,
    timeout: (
        float | None
    ) = None,  # stub for type checker, multiloop_protector handles timeout
) -> Connection:
    if self._connection is None:
        _connection = self._connection = Connection(self)
        return _connection
    return self._connection

connection

connection(timeout=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
548
549
550
551
552
553
def connection(self, timeout: float | None = None) -> Connection:
    if not self.is_connected:
        raise RuntimeError("Database is not connected")
    if self.force_rollback:
        return cast(Connection, self._global_connection)
    return self._non_global_connection(timeout=timeout)

asgi

asgi(app: None, handle_lifespan: bool = False) -> Callable[[ASGIApp], ASGIApp]
asgi(app: ASGIApp, handle_lifespan: bool = False) -> ASGIApp
asgi(app=None, handle_lifespan=False)

Return wrapper for asgi integration.

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
574
575
576
577
578
579
580
581
582
583
584
585
586
587
def asgi(
    self,
    app: ASGIApp | None = None,
    handle_lifespan: bool = False,
) -> ASGIApp | Callable[[ASGIApp], ASGIApp]:
    """Return wrapper for asgi integration."""

    async def setup() -> contextlib.AsyncExitStack:
        cleanupstack = contextlib.AsyncExitStack()
        await self.connect()
        cleanupstack.push_async_callback(self.disconnect)
        return cleanupstack

    return LifespanHook(app, setup=setup, do_forward=not handle_lifespan)

get_backends classmethod

get_backends(scheme='', *, overwrite_paths=['databasez.overwrites'], database_name='Database', connection_name='Connection', transaction_name='Transaction', database_class=None, connection_class=None, transaction_class=None)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
@classmethod
def get_backends(
    cls,
    # let scheme empty for direct imports
    scheme: str = "",
    *,
    overwrite_paths: Sequence[str] = ["databasez.overwrites"],
    database_name: str = "Database",
    connection_name: str = "Connection",
    transaction_name: str = "Transaction",
    database_class: type[interfaces.DatabaseBackend] | None = None,
    connection_class: type[interfaces.ConnectionBackend] | None = None,
    transaction_class: type[interfaces.TransactionBackend] | None = None,
) -> tuple[
    type[interfaces.DatabaseBackend],
    type[interfaces.ConnectionBackend],
    type[interfaces.TransactionBackend],
]:
    module: Any = None
    # when not using utils...., the constant cannot be changed at runtime for debug purposes
    more_debug = utils.DATABASEZ_OVERWRITE_LOGGING
    for overwrite_path in overwrite_paths:
        imp_path = f"{overwrite_path}.{scheme.replace('+', '_')}" if scheme else overwrite_path
        try:
            module = importlib.import_module(imp_path)
        except ImportError as exc:
            logging.debug(
                f'Could not import "{imp_path}". Continue search.',
                exc_info=exc if more_debug else None,
            )
            if "+" in scheme:
                imp_path = f"{overwrite_path}.{scheme.split('+', 1)[0]}"
                try:
                    module = importlib.import_module(imp_path)
                except ImportError as exc:
                    logging.debug(
                        f'Could not import "{imp_path}". Continue search.',
                        exc_info=exc if more_debug else None,
                    )
        if module is not None:
            break
    if module is None:
        logging.debug(
            "No overwrites found. Use default.",
        )
    database_class = getattr(module, database_name, database_class)
    assert database_class is not None and issubclass(
        database_class, interfaces.DatabaseBackend
    )
    connection_class = getattr(module, connection_name, connection_class)
    assert connection_class is not None and issubclass(
        connection_class, interfaces.ConnectionBackend
    )
    transaction_class = getattr(module, transaction_name, transaction_class)
    assert transaction_class is not None and issubclass(
        transaction_class, interfaces.TransactionBackend
    )
    return database_class, connection_class, transaction_class

apply_database_url_and_options classmethod

apply_database_url_and_options(url, *, overwrite_paths=['databasez.overwrites'], **options)
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
@classmethod
def apply_database_url_and_options(
    cls,
    url: DatabaseURL | str,
    *,
    overwrite_paths: Sequence[str] = ["databasez.overwrites"],
    **options: Any,
) -> tuple[interfaces.DatabaseBackend, DatabaseURL, dict[str, Any]]:
    url = DatabaseURL(url)
    database_class, connection_class, transaction_class = cls.get_backends(
        url.scheme,
        database_class=default_database,
        connection_class=default_connection,
        transaction_class=default_transaction,
        overwrite_paths=overwrite_paths,
    )

    backend = database_class(
        connection_class=connection_class, transaction_class=transaction_class
    )
    url, options = backend.extract_options(url, **options)
    # check against transformed url
    assert url.sqla_url.get_dialect(True).is_async, f'Dialect: "{url.scheme}" is not async.'

    return backend, url, options