Skip to content

Registry class

edgy.Registry

Registry(database, *, with_content_type=False, schema=None, extra=None, **kwargs)

The command center for the models of Edgy.

PARAMETER DESCRIPTION
database

TYPE: Union[Database, str, DatabaseURL]

with_content_type

TYPE: Union[bool, type[BaseModelType]] DEFAULT: False

schema

TYPE: Union[str, None] DEFAULT: None

extra

TYPE: Optional[dict[str, Database]] DEFAULT: None

**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/connection/registry.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def __init__(
    self,
    database: Union[Database, str, DatabaseURL],
    *,
    with_content_type: Union[bool, type["BaseModelType"]] = False,
    schema: Union[str, None] = None,
    extra: Optional[dict[str, Database]] = None,
    **kwargs: Any,
) -> None:
    self.db_schema = schema
    extra = extra or {}
    self.database: Database = (
        database if isinstance(database, Database) else Database(database, **kwargs)
    )
    self.models: dict[str, type[BaseModelType]] = {}
    self.reflected: dict[str, type[BaseModelType]] = {}
    self.tenant_models: dict[str, type[BaseModelType]] = {}
    self.pattern_models: dict[str, type[AutoReflectionModel]] = {}
    self.dbs_reflected = set()

    self.schema = Schema(registry=self)
    # when setting a Model or Reflected Model execute the callbacks
    # Note: they are only executed if the Model is not in Registry yet
    self._onetime_callbacks: dict[
        Union[str, None], list[Callable[[type[BaseModelType]], None]]
    ] = defaultdict(list)
    self._callbacks: dict[Union[str, None], list[Callable[[type[BaseModelType]], None]]] = (
        defaultdict(list)
    )

    self.extra: dict[str, Database] = {
        k: v if isinstance(v, Database) else Database(v) for k, v in extra.items()
    }
    self.metadata_by_url = MetaDataByUrlDict(registry=self)

    if with_content_type is not False:
        self._set_content_type(with_content_type)

db_schema class-attribute instance-attribute

db_schema = schema

content_type class-attribute instance-attribute

content_type = None

dbs_reflected instance-attribute

dbs_reflected = set()

database instance-attribute

database = database if isinstance(database, Database) else Database(database, **kwargs)

models instance-attribute

models = {}

reflected instance-attribute

reflected = {}

tenant_models instance-attribute

tenant_models = {}

pattern_models instance-attribute

pattern_models = {}

schema instance-attribute

schema = Schema(registry=self)

_onetime_callbacks instance-attribute

_onetime_callbacks = defaultdict(list)

_callbacks instance-attribute

_callbacks = defaultdict(list)

extra instance-attribute

extra = {k: v if isinstance(v, Database) else Database(v)for (k, v) in items()}

metadata_by_url instance-attribute

metadata_by_url = MetaDataByUrlDict(registry=self)

metadata_by_name property writable

metadata_by_name

metadata property

metadata

declarative_base cached property

declarative_base

engine property

engine

sync_engine property

sync_engine

__copy__

__copy__()
Source code in edgy/core/connection/registry.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def __copy__(self) -> "Registry":
    content_type: Union[bool, type[BaseModelType]] = False
    if self.content_type is not None:
        try:
            content_type2 = content_type = self.get_model(
                "ContentType", include_content_type_attr=False
            ).copy_edgy_model()
            # cleanup content_type copy
            for field_name in list(content_type2.meta.fields.keys()):
                if field_name.startswith("reverse_"):
                    del content_type2.meta.fields[field_name]
        except LookupError:
            content_type = self.content_type
    _copy = Registry(
        self.database, with_content_type=content_type, schema=self.db_schema, extra=self.extra
    )
    for i in ["models", "reflected", "tenant_models", "pattern_models"]:
        dict_models = getattr(_copy, i)
        dict_models.update(
            (
                (key, val.copy_edgy_model(_copy))
                for key, val in getattr(self, i).items()
                if key not in dict_models
            )
        )
    _copy.dbs_reflected = set(self.dbs_reflected)
    return _copy

_set_content_type

_set_content_type(with_content_type, old_content_type_to_replace=None)
PARAMETER DESCRIPTION
with_content_type

TYPE: Union[Literal[True], type[BaseModelType]]

old_content_type_to_replace

TYPE: Optional[type[BaseModelType]] DEFAULT: None

Source code in edgy/core/connection/registry.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def _set_content_type(
    self,
    with_content_type: Union[Literal[True], type["BaseModelType"]],
    old_content_type_to_replace: Optional[type["BaseModelType"]] = None,
) -> None:
    from edgy.contrib.contenttypes.fields import BaseContentTypeFieldField, ContentTypeField
    from edgy.contrib.contenttypes.models import ContentType
    from edgy.core.db.models.metaclasses import MetaInfo
    from edgy.core.db.relationships.related_field import RelatedField
    from edgy.core.utils.models import create_edgy_model

    if with_content_type is True:
        with_content_type = ContentType

    real_content_type: type[BaseModelType] = with_content_type

    if real_content_type.meta.abstract:
        meta_args = {
            "tablename": "contenttypes",
            "registry": self,
        }

        new_meta: MetaInfo = MetaInfo(None, **meta_args)
        # model adds itself to registry and executes callbacks
        real_content_type = create_edgy_model(
            "ContentType",
            with_content_type.__module__,
            __metadata__=new_meta,
            __bases__=(with_content_type,),
        )
    elif real_content_type.meta.registry is None:
        real_content_type.add_to_registry(self, "ContentType")
    self.content_type = real_content_type

    def callback(model_class: type["BaseModelType"]) -> None:
        # they are not updated, despite this shouldn't happen anyway
        if issubclass(model_class, ContentType):
            return
        # skip if is explicit set or remove when copying
        for field in model_class.meta.fields.values():
            if isinstance(field, BaseContentTypeFieldField):
                if (
                    old_content_type_to_replace is not None
                    and field.target is old_content_type_to_replace
                ):
                    field.target_registry = self
                    field.target = real_content_type
                    # simply overwrite
                    real_content_type.meta.fields[field.related_name] = RelatedField(
                        name=field.related_name,
                        foreign_key_name=field.name,
                        related_from=model_class,
                        owner=real_content_type,
                    )
                return

        # e.g. exclude field
        if "content_type" in model_class.meta.fields:
            return
        related_name = f"reverse_{model_class.__name__.lower()}"
        assert (
            related_name not in real_content_type.meta.fields
        ), f"duplicate model name: {model_class.__name__}"

        field_args: dict[str, Any] = {
            "name": "content_type",
            "owner": model_class,
            "to": real_content_type,
            "no_constraint": real_content_type.no_constraint,
            "no_copy": True,
        }
        if model_class.meta.registry is not real_content_type.meta.registry:
            field_args["relation_has_post_delete_callback"] = True
            field_args["force_cascade_deletion_relation"] = True
        model_class.meta.fields["content_type"] = cast(
            "BaseFieldType",
            ContentTypeField(**field_args),
        )
        real_content_type.meta.fields[related_name] = RelatedField(
            name=related_name,
            foreign_key_name="content_type",
            related_from=model_class,
            owner=real_content_type,
        )

    self.register_callback(None, callback, one_time=False)

get_model

get_model(model_name, *, include_content_type_attr=True)
PARAMETER DESCRIPTION
model_name

TYPE: str

include_content_type_attr

TYPE: bool DEFAULT: True

Source code in edgy/core/connection/registry.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def get_model(
    self, model_name: str, *, include_content_type_attr: bool = True
) -> type["BaseModelType"]:
    if (
        include_content_type_attr
        and model_name == "ContentType"
        and self.content_type is not None
    ):
        return self.content_type
    if model_name in self.models:
        return self.models[model_name]
    elif model_name in self.reflected:
        return self.reflected[model_name]
    elif model_name in self.tenant_models:
        return self.tenant_models[model_name]
    else:
        raise LookupError(f"Registry doesn't have a {model_name} model.") from None

refresh_metadata

refresh_metadata(*, update_only=False, multi_schema=False, ignore_schema_pattern='information_schema')
PARAMETER DESCRIPTION
update_only

TYPE: bool DEFAULT: False

multi_schema

TYPE: Union[bool, Pattern, str] DEFAULT: False

ignore_schema_pattern

TYPE: Union[None, Pattern, str] DEFAULT: 'information_schema'

Source code in edgy/core/connection/registry.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def refresh_metadata(
    self,
    *,
    update_only: bool = False,
    multi_schema: Union[bool, re.Pattern, str] = False,
    ignore_schema_pattern: Union[None, "re.Pattern", str] = "information_schema",
) -> None:
    if not update_only:
        for val in self.metadata_by_name.values():
            val.clear()
    maindatabase_url = str(self.database.url)
    if multi_schema is not False:
        schemes_tree: dict[str, tuple[Optional[str], list[str]]] = {
            v[0]: (key, v[2])
            for key, v in run_sync(self.schema.get_schemes_tree(no_reflect=True)).items()
        }
    else:
        schemes_tree = {
            maindatabase_url: (None, [self.db_schema]),
            **{str(v.url): (k, [None]) for k, v in self.extra.items()},
        }

    if isinstance(multi_schema, str):
        multi_schema = re.compile(multi_schema)
    if isinstance(ignore_schema_pattern, str):
        ignore_schema_pattern = re.compile(ignore_schema_pattern)
    for model_class in self.models.values():
        if not update_only:
            model_class._table = None
            model_class._db_schemas = {}
        url = str(model_class.database.url)
        if url in schemes_tree:
            extra_key, schemes = schemes_tree[url]
            for schema in schemes:
                if multi_schema is not False:
                    if multi_schema is not True and multi_schema.match(schema) is None:
                        continue
                    if (
                        ignore_schema_pattern is not None
                        and ignore_schema_pattern.match(schema) is not None
                    ):
                        continue
                    if not getattr(model_class.meta, "is_tenant", False):
                        if (
                            model_class.__using_schema__ is Undefined
                            or model_class.__using_schema__ is None
                        ):
                            if schema != "":
                                continue
                        elif model_class.__using_schema__ != schema:
                            continue
                model_class.table_schema(schema=schema, metadata=self.metadata_by_url[url])

    # don't initialize to keep the metadata clean
    if not update_only:
        for model_class in self.reflected.values():
            model_class._table = None
            model_class._db_schemas = {}

register_callback

register_callback(name_or_class, callback, one_time=None)
PARAMETER DESCRIPTION
name_or_class

TYPE: Union[type[BaseModelType], str, None]

callback

TYPE: Callable[[type[BaseModelType]], None]

one_time

TYPE: Optional[bool] DEFAULT: None

Source code in edgy/core/connection/registry.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
def register_callback(
    self,
    name_or_class: Union[type["BaseModelType"], str, None],
    callback: Callable[[type["BaseModelType"]], None],
    one_time: Optional[bool] = None,
) -> None:
    if one_time is None:
        # True for model specific callbacks, False for general callbacks
        one_time = name_or_class is not None
    called: bool = False
    if name_or_class is None:
        for model in self.models.values():
            callback(model)
            called = True
        for model in self.reflected.values():
            callback(model)
            called = True
        for name, model in self.tenant_models.items():
            # for tenant only models
            if name not in self.models:
                callback(model)
                called = True
    elif not isinstance(name_or_class, str):
        callback(name_or_class)
        called = True
    else:
        model_class = None
        with contextlib.suppress(LookupError):
            model_class = self.get_model(name_or_class)
        if model_class is not None:
            callback(model_class)
            called = True
    if name_or_class is not None and not isinstance(name_or_class, str):
        name_or_class = name_or_class.__name__
    if called and one_time:
        return
    if one_time:
        self._onetime_callbacks[name_or_class].append(callback)
    else:
        self._callbacks[name_or_class].append(callback)

execute_model_callbacks

execute_model_callbacks(model_class)
PARAMETER DESCRIPTION
model_class

TYPE: type[BaseModelType]

Source code in edgy/core/connection/registry.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
def execute_model_callbacks(self, model_class: type["BaseModelType"]) -> None:
    name = model_class.__name__
    callbacks = self._onetime_callbacks.get(name)
    while callbacks:
        callbacks.pop()(model_class)

    callbacks = self._onetime_callbacks.get(None)
    while callbacks:
        callbacks.pop()(model_class)

    callbacks = self._callbacks.get(name)
    if callbacks:
        for callback in callbacks:
            callback(model_class)

    callbacks = self._callbacks.get(None)
    if callbacks:
        for callback in callbacks:
            callback(model_class)

init_models

init_models(*, init_column_mappers=True, init_class_attrs=True)

Initializes lazy parts of models meta. Normally not needed to call.

PARAMETER DESCRIPTION
init_column_mappers

TYPE: bool DEFAULT: True

init_class_attrs

TYPE: bool DEFAULT: True

Source code in edgy/core/connection/registry.py
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def init_models(
    self, *, init_column_mappers: bool = True, init_class_attrs: bool = True
) -> None:
    """
    Initializes lazy parts of models meta. Normally not needed to call.
    """
    for model_class in self.models.values():
        model_class.meta.full_init(
            init_column_mappers=init_column_mappers, init_class_attrs=init_class_attrs
        )

    for model_class in self.reflected.values():
        model_class.meta.full_init(
            init_column_mappers=init_column_mappers, init_class_attrs=init_class_attrs
        )

invalidate_models

invalidate_models(*, clear_class_attrs=True)

Invalidate all lazy parts of meta. They will automatically re-initialized on access.

PARAMETER DESCRIPTION
clear_class_attrs

TYPE: bool DEFAULT: True

Source code in edgy/core/connection/registry.py
452
453
454
455
456
457
458
459
def invalidate_models(self, *, clear_class_attrs: bool = True) -> None:
    """
    Invalidate all lazy parts of meta. They will automatically re-initialized on access.
    """
    for model_class in self.models.values():
        model_class.meta.invalidate(clear_class_attrs=clear_class_attrs)
    for model_class in self.reflected.values():
        model_class.meta.invalidate(clear_class_attrs=clear_class_attrs)

get_tablenames

get_tablenames()
Source code in edgy/core/connection/registry.py
461
462
463
464
465
466
467
def get_tablenames(self) -> set[str]:
    return_set = set()
    for model_class in self.models.values():
        return_set.add(model_class.meta.tablename)
    for model_class in self.reflected.values():
        return_set.add(model_class.meta.tablename)
    return return_set

_connect_and_init async

_connect_and_init(name, database)
PARAMETER DESCRIPTION
name

TYPE: Union[str, None]

database

TYPE: Database

Source code in edgy/core/connection/registry.py
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
async def _connect_and_init(self, name: Union[str, None], database: "Database") -> None:
    from edgy.core.db.models.metaclasses import MetaInfo

    await database.connect()
    if not self.pattern_models or name in self.dbs_reflected:
        return
    schemes = set()
    for pattern_model in self.pattern_models.values():
        if name not in pattern_model.meta.databases:
            continue
        schemes.update(pattern_model.meta.schemes)
    tmp_metadata = sqlalchemy.MetaData()
    for schema in schemes:
        await database.run_sync(tmp_metadata.reflect, schema=schema)
    try:
        for table in tmp_metadata.tables.values():
            for pattern_model in self.pattern_models.values():
                if name not in pattern_model.meta.databases or table.schema not in schemes:
                    continue
                assert pattern_model.meta.model is pattern_model
                # table.key would contain the schema name
                if not pattern_model.meta.include_pattern.match(table.name) or (
                    pattern_model.meta.exclude_pattern
                    and pattern_model.meta.exclude_pattern.match(table.name)
                ):
                    continue
                if pattern_model.fields_not_supported_by_table(table):
                    continue
                new_name = pattern_model.meta.template(table)
                old_model: Optional[type[BaseModelType]] = None
                with contextlib.suppress(LookupError):
                    old_model = self.get_model(new_name)
                if old_model is not None:
                    raise Exception(
                        f"Conflicting model: {old_model.__name__} with pattern model: {pattern_model.__name__}"
                    )
                concrete_reflect_model = pattern_model.copy_edgy_model(
                    name=new_name, meta_info_class=MetaInfo
                )
                concrete_reflect_model.meta.tablename = table.name
                concrete_reflect_model.__using_schema__ = table.schema
                concrete_reflect_model.add_to_registry(self, database=database)

        self.dbs_reflected.add(name)
    except BaseException as exc:
        await database.disconnect()
        raise exc

__aenter__ async

__aenter__()
Source code in edgy/core/connection/registry.py
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
async def __aenter__(self) -> "Registry":
    dbs: list[tuple[Union[str, None], Database]] = [(None, self.database)]
    for name, db in self.extra.items():
        dbs.append((name, db))
    ops = [self._connect_and_init(name, db) for name, db in dbs]
    results: list[Union[BaseException, bool]] = await asyncio.gather(
        *ops, return_exceptions=True
    )
    if any(isinstance(x, BaseException) for x in results):
        ops2 = []
        for num, value in enumerate(results):
            if not isinstance(value, BaseException):
                ops2.append(dbs[num][1].disconnect())
            else:
                logger.opt(exception=value).error("Failed to connect database.")
        await asyncio.gather(*ops2)
    return self

__aexit__ async

__aexit__(exc_type=None, exc_value=None, traceback=None)
PARAMETER DESCRIPTION
exc_type

TYPE: Optional[type[BaseException]] DEFAULT: None

exc_value

TYPE: Optional[BaseException] DEFAULT: None

traceback

TYPE: Optional[TracebackType] DEFAULT: None

Source code in edgy/core/connection/registry.py
535
536
537
538
539
540
541
542
543
544
async def __aexit__(
    self,
    exc_type: Optional[type[BaseException]] = None,
    exc_value: Optional[BaseException] = None,
    traceback: Optional[TracebackType] = None,
) -> None:
    ops = [self.database.disconnect()]
    for value in self.extra.values():
        ops.append(value.disconnect())
    await asyncio.gather(*ops)

asgi

asgi(app: None, handle_lifespan: bool = False) -> Callable[[ASGIApp], ASGIHelper]
asgi(app: ASGIApp, handle_lifespan: bool = False) -> ASGIHelper
asgi(app=None, handle_lifespan=False)

Return wrapper for asgi integration.

PARAMETER DESCRIPTION
app

TYPE: Optional[ASGIApp] DEFAULT: None

handle_lifespan

TYPE: bool DEFAULT: False

Source code in edgy/core/connection/registry.py
560
561
562
563
564
565
566
567
568
def asgi(
    self,
    app: Optional[ASGIApp] = None,
    handle_lifespan: bool = False,
) -> Union[ASGIHelper, Callable[[ASGIApp], ASGIHelper]]:
    """Return wrapper for asgi integration."""
    if app is not None:
        return ASGIHelper(app=app, registry=self, handle_lifespan=handle_lifespan)
    return partial(ASGIHelper, registry=self, handle_lifespan=handle_lifespan)

create_all async

create_all(refresh_metadata=True, databases=(None))
PARAMETER DESCRIPTION
refresh_metadata

TYPE: bool DEFAULT: True

databases

TYPE: Sequence[Union[str, None]] DEFAULT: (None)

Source code in edgy/core/connection/registry.py
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
async def create_all(
    self, refresh_metadata: bool = True, databases: Sequence[Union[str, None]] = (None,)
) -> None:
    # otherwise old references to non-existing tables, fks can lurk around
    if refresh_metadata:
        self.refresh_metadata(multi_schema=True)
    if self.db_schema:
        await self.schema.create_schema(
            self.db_schema, True, True, update_cache=True, databases=databases
        )
    else:
        # fallback when no schemes are in use. Because not all dbs support schemes
        # we cannot just use a scheme = ""
        for database in databases:
            db = self.database if database is None else self.extra[database]
            # don't warn here about inperformance
            async with db as db:
                with db.force_rollback(False):
                    await db.create_all(self.metadata_by_name[database])

drop_all async

drop_all(databases=(None))
PARAMETER DESCRIPTION
databases

TYPE: Sequence[Union[str, None]] DEFAULT: (None)

Source code in edgy/core/connection/registry.py
590
591
592
593
594
595
596
597
598
599
600
601
602
603
async def drop_all(self, databases: Sequence[Union[str, None]] = (None,)) -> None:
    if self.db_schema:
        await self.schema.drop_schema(
            self.db_schema, cascade=True, if_exists=True, databases=databases
        )
    else:
        # fallback when no schemes are in use. Because not all dbs support schemes
        # we cannot just use a scheme = ""
        for database_name in databases:
            db = self.database if database_name is None else self.extra[database_name]
            # don't warn here about inperformance
            async with db as db:
                with db.force_rollback(False):
                    await db.drop_all(self.metadata_by_name[database_name])