Skip to content

Registry class¶

edgy.Registry ¶

Registry(database, *, with_content_type=False, schema=None, extra=None, automigrate_config=None, **kwargs)

The command center for the models of Edgy.

Source code in edgy/core/connection/registry.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def __init__(
    self,
    database: Union[Database, str, DatabaseURL],
    *,
    with_content_type: Union[bool, type["BaseModelType"]] = False,
    schema: Union[str, None] = None,
    extra: Optional[dict[str, Database]] = None,
    automigrate_config: Union["EdgySettings", None] = None,
    **kwargs: Any,
) -> None:
    evaluate_settings_once_ready()
    self.db_schema = schema
    self._automigrate_config = automigrate_config
    self._is_automigrated: bool = False
    extra = extra or {}
    self.database: Database = (
        database if isinstance(database, Database) else Database(database, **kwargs)
    )
    self.models: dict[str, type[BaseModelType]] = {}
    self.reflected: dict[str, type[BaseModelType]] = {}
    self.tenant_models: dict[str, type[BaseModelType]] = {}
    self.pattern_models: dict[str, type[AutoReflectionModel]] = {}
    self.dbs_reflected = set()

    self.schema = Schema(registry=self)
    # when setting a Model or Reflected Model execute the callbacks
    # Note: they are only executed if the Model is not in Registry yet
    self._onetime_callbacks: dict[
        Union[str, None], list[Callable[[type[BaseModelType]], None]]
    ] = defaultdict(list)
    self._callbacks: dict[Union[str, None], list[Callable[[type[BaseModelType]], None]]] = (
        defaultdict(list)
    )

    self.extra: dict[str, Database] = {
        k: v if isinstance(v, Database) else Database(v) for k, v in extra.items()
    }
    # we want to get all problems before failing
    assert all(
        [self.extra_name_check(x) for x in self.extra]  # noqa: C419
    ), "Invalid name in extra detected. See logs for details."
    self.metadata_by_url = MetaDataByUrlDict(registry=self)

    if with_content_type is not False:
        self._set_content_type(with_content_type)

model_registry_types class-attribute ¶

model_registry_types = ('models', 'reflected', 'tenant_models', 'pattern_models')

db_schema class-attribute instance-attribute ¶

db_schema = schema

content_type class-attribute instance-attribute ¶

content_type = None

dbs_reflected instance-attribute ¶

dbs_reflected = set()

_automigrate_config instance-attribute ¶

_automigrate_config = automigrate_config

_is_automigrated instance-attribute ¶

_is_automigrated = False

database instance-attribute ¶

database = database if isinstance(database, Database) else Database(database, **kwargs)

models instance-attribute ¶

models = {}

reflected instance-attribute ¶

reflected = {}

tenant_models instance-attribute ¶

tenant_models = {}

pattern_models instance-attribute ¶

pattern_models = {}

schema instance-attribute ¶

schema = Schema(registry=self)

_onetime_callbacks instance-attribute ¶

_onetime_callbacks = defaultdict(list)

_callbacks instance-attribute ¶

_callbacks = defaultdict(list)

extra instance-attribute ¶

extra = {k: v if isinstance(v, Database) else Database(v)for (k, v) in items()}

metadata_by_url instance-attribute ¶

metadata_by_url = MetaDataByUrlDict(registry=self)

metadata_by_name property writable ¶

metadata_by_name

metadata property ¶

metadata

declarative_base cached property ¶

declarative_base

engine property ¶

engine

sync_engine property ¶

sync_engine

apply_default_force_nullable_fields async ¶

apply_default_force_nullable_fields(*, force_fields_nullable=None, model_defaults=None, filter_db_url=None, filter_db_name=None)

For online migrations and after migrations to apply defaults.

Source code in edgy/core/connection/registry.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
async def apply_default_force_nullable_fields(
    self,
    *,
    force_fields_nullable: Optional[Iterable[tuple[str, str]]] = None,
    model_defaults: Optional[dict[str, dict[str, Any]]] = None,
    filter_db_url: Optional[str] = None,
    filter_db_name: Union[str, None] = None,
) -> None:
    """For online migrations and after migrations to apply defaults."""
    if force_fields_nullable is None:
        force_fields_nullable = set(FORCE_FIELDS_NULLABLE.get())
    else:
        force_fields_nullable = set(force_fields_nullable)
    if model_defaults is None:
        model_defaults = {}
    for model_name, defaults in model_defaults.items():
        for default_name in defaults:
            force_fields_nullable.add((model_name, default_name))
    # for empty model names extract all matching models
    for item in list(force_fields_nullable):
        if not item[0]:
            force_fields_nullable.discard(item)
            for model in self.models.values():
                if item[1] in model.meta.fields:
                    force_fields_nullable.add((model.__name__, item[1]))

    if not force_fields_nullable:
        return
    if isinstance(filter_db_name, str):
        if filter_db_name:
            filter_db_url = str(self.extra[filter_db_name].url)
        else:
            filter_db_url = str(self.database.url)
    models_with_fields: dict[str, set[str]] = {}
    for item in force_fields_nullable:
        if item[0] not in self.models:
            continue
        if item[1] not in self.models[item[0]].meta.fields:
            continue
        if not self.models[item[0]].meta.fields[item[1]].has_default():
            overwrite_default = model_defaults.get(item[0]) or {}
            if item[1] not in overwrite_default:
                continue
        field_set = models_with_fields.setdefault(item[0], set())
        field_set.add(item[1])
    if not models_with_fields:
        return
    ops = []
    for model_name, field_set in models_with_fields.items():
        model = self.models[model_name]
        if filter_db_url and str(model.database.url) != filter_db_url:
            continue
        model_specific_defaults = model_defaults.get(model_name) or {}
        filter_kwargs = dict.fromkeys(field_set)

        async def wrapper_fn(
            _model: type["BaseModelType"] = model,
            _model_specific_defaults: dict = model_specific_defaults,
            _filter_kwargs: dict = filter_kwargs,
            _field_set: set[str] = field_set,
        ) -> None:
            # To reduce the memory usage, only retrieve pknames and load per object
            # We need to load all at once because otherwise the cursor could interfere with updates
            query = _model.query.filter(**_filter_kwargs).only(*_model.pknames)
            for obj in await query:
                await obj.load()
                kwargs = {
                    k: v for k, v in obj.extract_db_fields().items() if k not in _field_set
                }
                kwargs.update(_model_specific_defaults)
                # We need to serialize per table because otherwise transactions can fail
                # because of interlocking errors.
                # Also the tables can get big
                # is_partial = False
                token = CURRENT_INSTANCE.set(query)
                try:
                    await obj._update(
                        False,
                        kwargs,
                        pre_fn=partial(
                            _model.meta.signals.pre_update.send_async,
                            is_update=True,
                            is_migration=True,
                        ),
                        post_fn=partial(
                            _model.meta.signals.post_update.send_async,
                            is_update=True,
                            is_migration=True,
                        ),
                        instance=query,
                    )
                finally:
                    CURRENT_INSTANCE.reset(token)

        ops.append(wrapper_fn())
    await asyncio.gather(*ops)

extra_name_check ¶

extra_name_check(name)
Source code in edgy/core/connection/registry.py
253
254
255
256
257
258
259
260
261
262
263
264
265
def extra_name_check(self, name: Any) -> bool:
    if not isinstance(name, str):
        logger.error(f"Extra database name: {name!r} is not a string.")
        return False
    elif not name.strip():
        logger.error(f'Extra database name: "{name}" is empty.')
        return False

    if name.strip() != name:
        logger.warning(
            f'Extra database name: "{name}" starts or ends with whitespace characters.'
        )
    return True

__copy__ ¶

__copy__()
Source code in edgy/core/connection/registry.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def __copy__(self) -> "Registry":
    content_type: Union[bool, type[BaseModelType]] = False
    if self.content_type is not None:
        try:
            content_type = self.get_model(
                "ContentType", include_content_type_attr=False
            ).copy_edgy_model()
        except LookupError:
            content_type = self.content_type
    _copy = Registry(
        self.database, with_content_type=content_type, schema=self.db_schema, extra=self.extra
    )
    for registry_type in self.model_registry_types:
        dict_models = getattr(_copy, registry_type)
        dict_models.update(
            (
                (
                    key,
                    val.copy_edgy_model(registry=_copy),
                )
                for key, val in getattr(self, registry_type).items()
                if not val.meta.no_copy and key not in dict_models
            )
        )
    _copy.dbs_reflected = set(self.dbs_reflected)
    return _copy

_set_content_type ¶

_set_content_type(with_content_type, old_content_type_to_replace=None)
Source code in edgy/core/connection/registry.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def _set_content_type(
    self,
    with_content_type: Union[Literal[True], type["BaseModelType"]],
    old_content_type_to_replace: Optional[type["BaseModelType"]] = None,
) -> None:
    from edgy.contrib.contenttypes.fields import BaseContentTypeField, ContentTypeField
    from edgy.contrib.contenttypes.models import ContentType
    from edgy.core.db.models.metaclasses import MetaInfo
    from edgy.core.db.relationships.related_field import RelatedField
    from edgy.core.utils.models import create_edgy_model

    if with_content_type is True:
        with_content_type = ContentType

    real_content_type: type[BaseModelType] = with_content_type

    if real_content_type.meta.abstract:
        meta_args = {
            "tablename": "contenttypes",
            "registry": self,
        }

        new_meta: MetaInfo = MetaInfo(None, **meta_args)
        # model adds itself to registry and executes callbacks
        real_content_type = create_edgy_model(
            "ContentType",
            with_content_type.__module__,
            __metadata__=new_meta,
            __bases__=(with_content_type,),
        )
    elif real_content_type.meta.registry is None:
        real_content_type.add_to_registry(self, name="ContentType")
    self.content_type = real_content_type

    def callback(model_class: type["BaseModelType"]) -> None:
        # they are not updated, despite this shouldn't happen anyway
        if issubclass(model_class, ContentType):
            return
        # skip if is explicit set or remove when copying
        for field in model_class.meta.fields.values():
            if isinstance(field, BaseContentTypeField):
                if (
                    old_content_type_to_replace is not None
                    and field.target is old_content_type_to_replace
                ):
                    field.target_registry = self
                    field.target = real_content_type
                    # simply overwrite
                    real_content_type.meta.fields[field.related_name] = RelatedField(
                        name=field.related_name,
                        foreign_key_name=field.name,
                        related_from=model_class,
                        owner=real_content_type,
                    )
                return

        # e.g. exclude field
        if "content_type" in model_class.meta.fields:
            return
        related_name = f"reverse_{model_class.__name__.lower()}"
        assert related_name not in real_content_type.meta.fields, (
            f"duplicate model name: {model_class.__name__}"
        )

        field_args: dict[str, Any] = {
            "name": "content_type",
            "owner": model_class,
            "to": real_content_type,
            "no_constraint": real_content_type.no_constraint,
            "no_copy": True,
        }
        if model_class.meta.registry is not real_content_type.meta.registry:
            field_args["relation_has_post_delete_callback"] = True
            field_args["force_cascade_deletion_relation"] = True
        model_class.meta.fields["content_type"] = cast(
            "BaseFieldType",
            ContentTypeField(**field_args),
        )
        real_content_type.meta.fields[related_name] = RelatedField(
            name=related_name,
            foreign_key_name="content_type",
            related_from=model_class,
            owner=real_content_type,
        )

    self.register_callback(None, callback, one_time=False)

get_model ¶

get_model(model_name, *, include_content_type_attr=True, exclude=())
Source code in edgy/core/connection/registry.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def get_model(
    self,
    model_name: str,
    *,
    include_content_type_attr: bool = True,
    exclude: Container[str] = (),
) -> type["BaseModelType"]:
    if (
        include_content_type_attr
        and model_name == "ContentType"
        and self.content_type is not None
    ):
        return self.content_type
    for model_dict_name in self.model_registry_types:
        if model_dict_name in exclude:
            continue
        model_dict: dict = getattr(self, model_dict_name)
        if model_name in model_dict:
            return cast(type["BaseModelType"], model_dict[model_name])
    raise LookupError(f'Registry doesn\'t have a "{model_name}" model.') from None

delete_model ¶

delete_model(model_name)
Source code in edgy/core/connection/registry.py
425
426
427
428
429
430
431
def delete_model(self, model_name: str) -> bool:
    for model_dict_name in self.model_registry_types:
        model_dict: dict = getattr(self, model_dict_name)
        if model_name in model_dict:
            del model_dict[model_name]
            return True
    return False

refresh_metadata ¶

refresh_metadata(*, update_only=False, multi_schema=False, ignore_schema_pattern='information_schema')
Source code in edgy/core/connection/registry.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
def refresh_metadata(
    self,
    *,
    update_only: bool = False,
    multi_schema: Union[bool, re.Pattern, str] = False,
    ignore_schema_pattern: Union[None, "re.Pattern", str] = "information_schema",
) -> None:
    if not update_only:
        for val in self.metadata_by_name.values():
            val.clear()
    maindatabase_url = str(self.database.url)
    if multi_schema is not False:
        schemes_tree: dict[str, tuple[Optional[str], list[str]]] = {
            v[0]: (key, v[2])
            for key, v in run_sync(self.schema.get_schemes_tree(no_reflect=True)).items()
        }
    else:
        schemes_tree = {
            maindatabase_url: (None, [self.db_schema]),
            **{str(v.url): (k, [None]) for k, v in self.extra.items()},
        }

    if isinstance(multi_schema, str):
        multi_schema = re.compile(multi_schema)
    if isinstance(ignore_schema_pattern, str):
        ignore_schema_pattern = re.compile(ignore_schema_pattern)
    for model_class in self.models.values():
        if not update_only:
            model_class._table = None
            model_class._db_schemas = {}
        url = str(model_class.database.url)
        if url in schemes_tree:
            extra_key, schemes = schemes_tree[url]
            for schema in schemes:
                if multi_schema is not False:
                    if multi_schema is not True and multi_schema.match(schema) is None:
                        continue
                    if (
                        ignore_schema_pattern is not None
                        and ignore_schema_pattern.match(schema) is not None
                    ):
                        continue
                    if not getattr(model_class.meta, "is_tenant", False):
                        if (
                            model_class.__using_schema__ is Undefined
                            or model_class.__using_schema__ is None
                        ):
                            if schema != "":
                                continue
                        elif model_class.__using_schema__ != schema:
                            continue
                model_class.table_schema(schema=schema, metadata=self.metadata_by_url[url])

    # don't initialize to keep the metadata clean
    if not update_only:
        for model_class in self.reflected.values():
            model_class._table = None
            model_class._db_schemas = {}

register_callback ¶

register_callback(name_or_class, callback, one_time=None)
Source code in edgy/core/connection/registry.py
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
def register_callback(
    self,
    name_or_class: Union[type["BaseModelType"], str, None],
    callback: Callable[[type["BaseModelType"]], None],
    one_time: Optional[bool] = None,
) -> None:
    if one_time is None:
        # True for model specific callbacks, False for general callbacks
        one_time = name_or_class is not None
    called: bool = False
    if name_or_class is None:
        for model in self.models.values():
            callback(model)
            called = True
        for model in self.reflected.values():
            callback(model)
            called = True
        for name, model in self.tenant_models.items():
            # for tenant only models
            if name not in self.models:
                callback(model)
                called = True
    elif not isinstance(name_or_class, str):
        callback(name_or_class)
        called = True
    else:
        model_class = None
        with contextlib.suppress(LookupError):
            model_class = self.get_model(name_or_class)
        if model_class is not None:
            callback(model_class)
            called = True
    if name_or_class is not None and not isinstance(name_or_class, str):
        name_or_class = name_or_class.__name__
    if called and one_time:
        return
    if one_time:
        self._onetime_callbacks[name_or_class].append(callback)
    else:
        self._callbacks[name_or_class].append(callback)

execute_model_callbacks ¶

execute_model_callbacks(model_class)
Source code in edgy/core/connection/registry.py
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
def execute_model_callbacks(self, model_class: type["BaseModelType"]) -> None:
    name = model_class.__name__
    callbacks = self._onetime_callbacks.get(name)
    while callbacks:
        callbacks.pop()(model_class)

    callbacks = self._onetime_callbacks.get(None)
    while callbacks:
        callbacks.pop()(model_class)

    callbacks = self._callbacks.get(name)
    if callbacks:
        for callback in callbacks:
            callback(model_class)

    callbacks = self._callbacks.get(None)
    if callbacks:
        for callback in callbacks:
            callback(model_class)

init_models ¶

init_models(*, init_column_mappers=True, init_class_attrs=True)

Initializes lazy parts of models meta. Normally not needed to call.

Source code in edgy/core/connection/registry.py
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
def init_models(
    self, *, init_column_mappers: bool = True, init_class_attrs: bool = True
) -> None:
    """
    Initializes lazy parts of models meta. Normally not needed to call.
    """
    for model_class in self.models.values():
        model_class.meta.full_init(
            init_column_mappers=init_column_mappers, init_class_attrs=init_class_attrs
        )

    for model_class in self.reflected.values():
        model_class.meta.full_init(
            init_column_mappers=init_column_mappers, init_class_attrs=init_class_attrs
        )

invalidate_models ¶

invalidate_models(*, clear_class_attrs=True)

Invalidate all lazy parts of meta. They will automatically re-initialized on access.

Source code in edgy/core/connection/registry.py
586
587
588
589
590
591
592
593
def invalidate_models(self, *, clear_class_attrs: bool = True) -> None:
    """
    Invalidate all lazy parts of meta. They will automatically re-initialized on access.
    """
    for model_class in self.models.values():
        model_class.meta.invalidate(clear_class_attrs=clear_class_attrs)
    for model_class in self.reflected.values():
        model_class.meta.invalidate(clear_class_attrs=clear_class_attrs)

get_tablenames ¶

get_tablenames()
Source code in edgy/core/connection/registry.py
595
596
597
598
599
600
601
def get_tablenames(self) -> set[str]:
    return_set = set()
    for model_class in self.models.values():
        return_set.add(model_class.meta.tablename)
    for model_class in self.reflected.values():
        return_set.add(model_class.meta.tablename)
    return return_set

_automigrate_update ¶

_automigrate_update(migration_settings)
Source code in edgy/core/connection/registry.py
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
def _automigrate_update(
    self,
    migration_settings: "EdgySettings",
) -> None:
    from edgy import Instance, monkay
    from edgy.cli.base import upgrade

    with (
        monkay.with_extensions({}),
        monkay.with_settings(migration_settings),
        monkay.with_instance(Instance(registry=self), apply_extensions=False),
    ):
        self._is_automigrated = True
        monkay.evaluate_settings()
        monkay.apply_extensions()
        upgrade()

_automigrate async ¶

_automigrate()
Source code in edgy/core/connection/registry.py
620
621
622
623
624
625
626
627
628
async def _automigrate(self) -> None:
    from edgy import monkay

    migration_settings = self._automigrate_config
    if migration_settings is None or not monkay.settings.allow_automigrations:
        self._is_automigrated = True
        return

    await asyncio.to_thread(self._automigrate_update, migration_settings)

_connect_and_init async ¶

_connect_and_init(name, database)
Source code in edgy/core/connection/registry.py
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
async def _connect_and_init(self, name: Union[str, None], database: "Database") -> None:
    from edgy.core.db.models.metaclasses import MetaInfo

    await database.connect()
    if not self._is_automigrated:
        await self._automigrate()
    if not self.pattern_models or name in self.dbs_reflected:
        return
    schemes = set()
    for pattern_model in self.pattern_models.values():
        if name not in pattern_model.meta.databases:
            continue
        schemes.update(pattern_model.meta.schemes)
    tmp_metadata = sqlalchemy.MetaData()
    for schema in schemes:
        await database.run_sync(tmp_metadata.reflect, schema=schema)
    try:
        for table in tmp_metadata.tables.values():
            for pattern_model in self.pattern_models.values():
                if name not in pattern_model.meta.databases or table.schema not in schemes:
                    continue
                assert pattern_model.meta.model is pattern_model
                # table.key would contain the schema name
                if not pattern_model.meta.include_pattern.match(table.name) or (
                    pattern_model.meta.exclude_pattern
                    and pattern_model.meta.exclude_pattern.match(table.name)
                ):
                    continue
                if pattern_model.fields_not_supported_by_table(table):
                    continue
                new_name = pattern_model.meta.template(table)
                old_model: Optional[type[BaseModelType]] = None
                with contextlib.suppress(LookupError):
                    old_model = self.get_model(
                        new_name, include_content_type_attr=False, exclude=("pattern_models",)
                    )
                if old_model is not None:
                    raise Exception(
                        f"Conflicting model: {old_model.__name__} with pattern model: {pattern_model.__name__}"
                    )
                concrete_reflect_model = pattern_model.copy_edgy_model(
                    name=new_name, meta_info_class=MetaInfo
                )
                concrete_reflect_model.meta.no_copy = True
                concrete_reflect_model.meta.tablename = table.name
                concrete_reflect_model.__using_schema__ = table.schema
                concrete_reflect_model.add_to_registry(self, database=database)

        self.dbs_reflected.add(name)
    except BaseException as exc:
        await database.disconnect()
        raise exc

__aenter__ async ¶

__aenter__()
Source code in edgy/core/connection/registry.py
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
async def __aenter__(self) -> "Registry":
    dbs: list[tuple[Union[str, None], Database]] = [(None, self.database)]
    for name, db in self.extra.items():
        dbs.append((name, db))
    ops = [self._connect_and_init(name, db) for name, db in dbs]
    results: list[Union[BaseException, bool]] = await asyncio.gather(
        *ops, return_exceptions=True
    )
    if any(isinstance(x, BaseException) for x in results):
        ops2 = []
        for num, value in enumerate(results):
            if not isinstance(value, BaseException):
                ops2.append(dbs[num][1].disconnect())
            else:
                logger.opt(exception=value).error("Failed to connect database.")
        await asyncio.gather(*ops2)
    return self

__aexit__ async ¶

__aexit__(exc_type=None, exc_value=None, traceback=None)
Source code in edgy/core/connection/registry.py
701
702
703
704
705
706
707
708
709
710
async def __aexit__(
    self,
    exc_type: Optional[type[BaseException]] = None,
    exc_value: Optional[BaseException] = None,
    traceback: Optional[TracebackType] = None,
) -> None:
    ops = [self.database.disconnect()]
    for value in self.extra.values():
        ops.append(value.disconnect())
    await asyncio.gather(*ops)

with_async_env ¶

with_async_env(loop=None)
Source code in edgy/core/connection/registry.py
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
@contextlib.contextmanager
def with_async_env(
    self, loop: Optional[asyncio.AbstractEventLoop] = None
) -> Generator["Registry", None, None]:
    close: bool = False
    if loop is None:
        try:
            loop = asyncio.get_running_loop()
            # when in async context we don't create a loop
        except RuntimeError:
            # also when called recursively and current_eventloop is available
            loop = current_eventloop.get()
            if loop is None:
                loop = asyncio.new_event_loop()
                close = True

    token = current_eventloop.set(loop)
    try:
        yield run_sync(self.__aenter__(), loop=loop)
    finally:
        run_sync(self.__aexit__(), loop=loop)
        current_eventloop.reset(token)
        if close:
            loop.run_until_complete(loop.shutdown_asyncgens())
            loop.close()

asgi ¶

asgi(app: None, handle_lifespan: bool = False) -> Callable[[ASGIApp], ASGIHelper]
asgi(app: ASGIApp, handle_lifespan: bool = False) -> ASGIHelper
asgi(app=None, handle_lifespan=False)

Return wrapper for asgi integration.

Source code in edgy/core/connection/registry.py
752
753
754
755
756
757
758
759
760
def asgi(
    self,
    app: Optional[ASGIApp] = None,
    handle_lifespan: bool = False,
) -> Union[ASGIHelper, Callable[[ASGIApp], ASGIHelper]]:
    """Return wrapper for asgi integration."""
    if app is not None:
        return ASGIHelper(app=app, registry=self, handle_lifespan=handle_lifespan)
    return partial(ASGIHelper, registry=self, handle_lifespan=handle_lifespan)

create_all async ¶

create_all(refresh_metadata=True, databases=(None,))
Source code in edgy/core/connection/registry.py
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
async def create_all(
    self, refresh_metadata: bool = True, databases: Sequence[Union[str, None]] = (None,)
) -> None:
    # otherwise old references to non-existing tables, fks can lurk around
    if refresh_metadata:
        self.refresh_metadata(multi_schema=True)
    if self.db_schema:
        await self.schema.create_schema(
            self.db_schema, True, True, update_cache=True, databases=databases
        )
    else:
        # fallback when no schemes are in use. Because not all dbs support schemes
        # we cannot just use a scheme = ""
        for database in databases:
            db = self.database if database is None else self.extra[database]
            # don't warn here about inperformance
            async with db as db:
                with db.force_rollback(False):
                    await db.create_all(self.metadata_by_name[database])

drop_all async ¶

drop_all(databases=(None,))
Source code in edgy/core/connection/registry.py
782
783
784
785
786
787
788
789
790
791
792
793
794
795
async def drop_all(self, databases: Sequence[Union[str, None]] = (None,)) -> None:
    if self.db_schema:
        await self.schema.drop_schema(
            self.db_schema, cascade=True, if_exists=True, databases=databases
        )
    else:
        # fallback when no schemes are in use. Because not all dbs support schemes
        # we cannot just use a scheme = ""
        for database_name in databases:
            db = self.database if database_name is None else self.extra[database_name]
            # don't warn here about inperformance
            async with db as db:
                with db.force_rollback(False):
                    await db.drop_all(self.metadata_by_name[database_name])