Skip to content

ReflectModel class

edgy.ReflectModel

ReflectModel(*args, __show_pk__=False, __phase__='init', **kwargs)

Bases: ReflectedModelMixin, Model

Reflect on async engines is not yet supported, therefore, we need to make a sync_engine call.

PARAMETER DESCRIPTION
*args

TYPE: Any DEFAULT: ()

__show_pk__

TYPE: bool DEFAULT: False

__phase__

TYPE: str DEFAULT: 'init'

**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/base.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def __init__(
    self, *args: Any, __show_pk__: bool = False, __phase__: str = "init", **kwargs: Any
) -> None:
    self.__show_pk__ = __show_pk__
    # always set them in __dict__ to prevent __getattr__ loop
    self._loaded_or_deleted = False
    # inject in relation fields anonymous ModelRef (without a Field)
    for arg in args:
        if isinstance(arg, ModelRef):
            relation_field = self.meta.fields[arg.__related_name__]
            extra_params = {}
            try:
                # m2m or foreign key
                target_model_class = relation_field.target
            except AttributeError:
                # reverse m2m or foreign key
                target_model_class = relation_field.related_from
            if not relation_field.is_m2m:
                # sometimes the foreign key is required, so set it already
                extra_params[relation_field.foreign_key.name] = self
            model = target_model_class(
                **arg.model_dump(exclude={"__related_name__"}),
                **extra_params,
            )
            existing: Any = kwargs.get(arg.__related_name__)
            if isinstance(existing, Sequence):
                existing = [*existing, model]
            elif existing is None:
                existing = [model]
            else:
                existing = [existing, model]
            kwargs[arg.__related_name__] = existing

    kwargs = self.transform_input(kwargs, phase=__phase__, instance=self)
    super().__init__(**kwargs)
    # move to dict (e.g. reflected or subclasses which allow extra attributes)
    if self.__pydantic_extra__ is not None:
        # default was triggered
        self.__dict__.update(self.__pydantic_extra__)
        self.__pydantic_extra__ = None

    # cleanup fields
    for field_name in self.meta.fields:
        if field_name not in kwargs:
            self.__dict__.pop(field_name, None)

columns class-attribute

columns

database class-attribute

database = None

table property writable

table

pkcolumns property

pkcolumns

pknames property

pknames

query class-attribute

query = Manager()
query_related = RedirectManager(redirect_name='query')

meta class-attribute

meta = MetaInfo(None, abstract=True, registry=False)

__parent__ class-attribute

__parent__ = None

__is_proxy_model__ class-attribute

__is_proxy_model__ = False

__require_model_based_deletion__ class-attribute

__require_model_based_deletion__ = False

__reflected__ class-attribute

__reflected__ = True

_db_schemas class-attribute

_db_schemas

proxy_model property

proxy_model

identifying_db_fields cached property

identifying_db_fields

The columns used for loading, can be set per instance defaults to pknames

can_load property

can_load

__proxy_model__ class-attribute

__proxy_model__ = None

__show_pk__ class-attribute instance-attribute

__show_pk__ = __show_pk__

__using_schema__ class-attribute instance-attribute

__using_schema__ = Undefined

_loaded_or_deleted class-attribute instance-attribute

_loaded_or_deleted = False

__pydantic_extra__ instance-attribute

__pydantic_extra__ = None

signals property

signals

fields property

fields

_removed_copy_keys class-attribute

_removed_copy_keys = _removed_copy_keys

Meta

abstract class-attribute instance-attribute

abstract = True

registry class-attribute instance-attribute

registry = False

transaction

transaction(*, force_rollback=False, **kwargs)

Return database transaction for the assigned database

PARAMETER DESCRIPTION
force_rollback

TYPE: bool DEFAULT: False

**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/mixins/db.py
642
643
644
645
646
def transaction(self, *, force_rollback: bool = False, **kwargs: Any) -> "Transaction":
    """Return database transaction for the assigned database"""
    return cast(
        "Transaction", self.database.transaction(force_rollback=force_rollback, **kwargs)
    )

get_columns_for_name

get_columns_for_name(name)
PARAMETER DESCRIPTION
name

TYPE: str

Source code in edgy/core/db/models/mixins/db.py
294
295
296
297
298
299
300
301
302
def get_columns_for_name(self: "Model", name: str) -> Sequence["sqlalchemy.Column"]:
    table = self.table
    meta = self.meta
    if name in meta.field_to_columns:
        return meta.field_to_columns[name]
    elif name in table.columns:
        return (table.columns[name],)
    else:
        return cast(Sequence["sqlalchemy.Column"], _empty)

identifying_clauses

identifying_clauses(prefix='')
PARAMETER DESCRIPTION
prefix

TYPE: str DEFAULT: ''

Source code in edgy/core/db/models/mixins/db.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def identifying_clauses(self, prefix: str = "") -> list[Any]:
    # works only if the class of the model is the main class of the queryset
    # TODO: implement prefix handling and return generic column without table attached
    if prefix:
        raise NotImplementedError()
    clauses: list[Any] = []
    for field_name in self.identifying_db_fields:
        field = self.meta.fields.get(field_name)
        if field is not None:
            for column_name, value in field.clean(
                field_name, self.__dict__[field_name]
            ).items():
                clauses.append(getattr(self.table.columns, column_name) == value)
        else:
            clauses.append(
                getattr(self.table.columns, field_name) == self.__dict__[field_name]
            )
    return clauses

generate_proxy_model classmethod

generate_proxy_model()

Generates a proxy model for each model. This proxy model is a simple shallow copy of the original model being generated.

Source code in edgy/core/db/models/model.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@classmethod
def generate_proxy_model(cls) -> type[Model]:
    """
    Generates a proxy model for each model. This proxy model is a simple
    shallow copy of the original model being generated.
    """
    fields = {key: copy.copy(field) for key, field in cls.meta.fields.items()}

    class MethodHolder(Model):
        pass

    ignore = set(dir(MethodHolder))

    for key in dir(cls):
        if key in ignore or key.startswith("_"):
            continue
        val = inspect.getattr_static(cls, key)
        if inspect.isfunction(val):
            setattr(MethodHolder, key, val)

    proxy_model = ProxyModel(
        name=cls.__name__,
        module=cls.__module__,
        metadata=cls.meta,
        definitions=fields,
        bases=(MethodHolder,),
    )

    proxy_model.build()
    generify_model_fields(cast(type[EdgyBaseModel], proxy_model.model))
    return cast(type[Model], proxy_model.model)

load async

load(only_needed=False)
PARAMETER DESCRIPTION
only_needed

TYPE: bool DEFAULT: False

Source code in edgy/core/db/models/mixins/db.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
async def load(self, only_needed: bool = False) -> None:
    if only_needed and self._loaded_or_deleted:
        return
    row = None
    clauses = self.identifying_clauses()
    if clauses:
        # Build the select expression.
        expression = self.table.select().where(*clauses)

        # Perform the fetch.
        check_db_connection(self.database)
        async with self.database as database:
            row = await database.fetch_one(expression)
    # check if is in system
    if row is None:
        raise ObjectNotFound("row does not exist anymore")
    # Update the instance.
    self.__dict__.update(self.transform_input(dict(row._mapping), phase="load", instance=self))
    self._loaded_or_deleted = True

update async

update(**kwargs)
PARAMETER DESCRIPTION
**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/mixins/db.py
370
371
372
373
374
375
376
async def update(self: "Model", **kwargs: Any) -> "Model":
    token = EXPLICIT_SPECIFIED_VALUES.set(set(kwargs.keys()))
    try:
        await self._update(**kwargs)
    finally:
        EXPLICIT_SPECIFIED_VALUES.reset(token)
    return self

save async

save(force_insert=False, values=None, force_save=None)

Performs a save of a given model instance. When creating a user it will make sure it can update existing or create a new one.

PARAMETER DESCRIPTION
force_insert

TYPE: bool DEFAULT: False

values

TYPE: Union[dict[str, Any], set[str], None] DEFAULT: None

force_save

TYPE: Optional[bool] DEFAULT: None

Source code in edgy/core/db/models/mixins/db.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
async def save(
    self: "Model",
    force_insert: bool = False,
    values: Union[dict[str, Any], set[str], None] = None,
    force_save: Optional[bool] = None,
) -> "Model":
    """
    Performs a save of a given model instance.
    When creating a user it will make sure it can update existing or
    create a new one.
    """
    if force_save is not None:
        warnings.warn(
            "'force_save' is deprecated in favor of 'force_insert'",
            DeprecationWarning,
            stacklevel=2,
        )
        force_insert = force_save

    await self.meta.signals.pre_save.send_async(self.__class__, instance=self)

    extracted_fields = self.extract_db_fields()
    if values is None:
        explicit_values: set[str] = set()
    elif isinstance(values, set):
        # special mode for marking values as explicit values
        explicit_values = set(values)
        values = None
    else:
        explicit_values = set(values.keys())

    token = MODEL_GETATTR_BEHAVIOR.set("coro")
    try:
        for pkcolumn in self.__class__.pkcolumns:
            # should trigger load in case of identifying_db_fields
            value = getattr(self, pkcolumn, None)
            if inspect.isawaitable(value):
                value = await value
            if value is None and self.table.columns[pkcolumn].autoincrement:
                extracted_fields.pop(pkcolumn, None)
                force_insert = True
            field = self.meta.fields.get(pkcolumn)
            # this is an IntegerField with primary_key set
            if field is not None and getattr(field, "increment_on_save", 0) != 0:
                # we create a new revision.
                force_insert = True
                # Note: we definitely want this because it is easy for forget a force_insert
    finally:
        MODEL_GETATTR_BEHAVIOR.reset(token)

    token2 = EXPLICIT_SPECIFIED_VALUES.set(explicit_values)
    try:
        if force_insert:
            if values:
                extracted_fields.update(values)
            # force save must ensure a complete mapping
            await self._insert(**extracted_fields)
        else:
            await self._update(**(extracted_fields if values is None else values))
    finally:
        EXPLICIT_SPECIFIED_VALUES.reset(token2)
    await self.meta.signals.post_save.send_async(self.__class__, instance=self)
    return self

delete async

delete(skip_post_delete_hooks=False, remove_referenced_call=False)

Delete operation from the database

PARAMETER DESCRIPTION
skip_post_delete_hooks

TYPE: bool DEFAULT: False

remove_referenced_call

TYPE: bool DEFAULT: False

Source code in edgy/core/db/models/mixins/db.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
async def delete(
    self, skip_post_delete_hooks: bool = False, remove_referenced_call: bool = False
) -> None:
    """Delete operation from the database"""
    await self.meta.signals.pre_delete.send_async(self.__class__, instance=self)
    # get values before deleting
    field_values: dict[str, Any] = {}
    if not skip_post_delete_hooks and self.meta.post_delete_fields:
        token = MODEL_GETATTR_BEHAVIOR.set("coro")
        try:
            for field_name in self.meta.post_delete_fields:
                try:
                    field_value = getattr(self, field_name)
                except AttributeError:
                    continue
                if inspect.isawaitable(field_value):
                    try:
                        field_value = await field_value
                    except AttributeError:
                        continue
                field_values[field_name] = field_value
        finally:
            MODEL_GETATTR_BEHAVIOR.reset(token)
    clauses = self.identifying_clauses()
    if clauses:
        expression = self.table.delete().where(*clauses)
        check_db_connection(self.database)
        async with self.database as database:
            await database.execute(expression)
    # we cannot load anymore
    self._loaded_or_deleted = True
    # now cleanup with the saved values
    for field_name, value in field_values.items():
        field = self.meta.fields[field_name]
        await field.post_delete_callback(value, instance=self)

    await self.meta.signals.post_delete.send_async(self.__class__, instance=self)

load_recursive async

load_recursive(only_needed=False, only_needed_nest=False, _seen=None)
PARAMETER DESCRIPTION
only_needed

TYPE: bool DEFAULT: False

only_needed_nest

TYPE: bool DEFAULT: False

_seen

TYPE: Optional[set[Any]] DEFAULT: None

Source code in edgy/core/db/models/base.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
async def load_recursive(
    self,
    only_needed: bool = False,
    only_needed_nest: bool = False,
    _seen: Optional[set[Any]] = None,
) -> None:
    if _seen is None:
        _seen = {self.create_model_key()}
    else:
        model_key = self.create_model_key()
        if model_key in _seen:
            return
        else:
            _seen.add(model_key)
    _loaded_or_deleted = self._loaded_or_deleted
    if self.can_load:
        await self.load(only_needed)
    if only_needed_nest and _loaded_or_deleted:
        return
    for field_name in self.meta.foreign_key_fields:
        value = getattr(self, field_name, None)
        if value is not None:
            # if a subinstance is fully loaded stop
            await value.load_recursive(
                only_needed=only_needed, only_needed_nest=True, _seen=_seen
            )

model_dump

model_dump(show_pk=None, **kwargs)

An updated version of the model dump. It can show the pk always and handles the exclude attribute on fields correctly and contains the custom logic for fields with getters

PARAMETER DESCRIPTION
show_pk

TYPE: Union[bool, None] DEFAULT: None

**kwargs

TYPE: Any DEFAULT: {}

Extra Args

show_pk: bool - Enforces showing the primary key in the model_dump.

Source code in edgy/core/db/models/base.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def model_dump(self, show_pk: Union[bool, None] = None, **kwargs: Any) -> dict[str, Any]:
    """
    An updated version of the model dump.
    It can show the pk always and handles the exclude attribute on fields correctly and
    contains the custom logic for fields with getters

    Extra Args:
        show_pk: bool - Enforces showing the primary key in the model_dump.
    """
    # we want a copy
    exclude: Union[set[str], dict[str, Any], None] = kwargs.pop("exclude", None)
    if exclude is None:
        initial_full_field_exclude = _empty
        # must be writable
        exclude = set()
    elif isinstance(exclude, dict):
        initial_full_field_exclude = {k for k, v in exclude.items() if v is True}
        exclude = copy.copy(exclude)
    else:
        initial_full_field_exclude = set(exclude)
        exclude = copy.copy(initial_full_field_exclude)

    if isinstance(exclude, dict):
        # exclude __show_pk__ attribute from showing up
        exclude["__show_pk__"] = True
        for field_name in self.meta.excluded_fields:
            exclude[field_name] = True
    else:
        exclude.update(self.meta.special_getter_fields)
        exclude.update(self.meta.excluded_fields)
        # exclude __show_pk__ attribute from showing up
        exclude.add("__show_pk__")
    include: Union[set[str], dict[str, Any], None] = kwargs.pop("include", None)
    mode: Union[Literal["json", "python"], str] = kwargs.pop("mode", "python")

    should_show_pk = self.__show_pk__ if show_pk is None else show_pk
    model = super().model_dump(exclude=exclude, include=include, mode=mode, **kwargs)
    # Workaround for metafields, computed field logic introduces many problems
    # so reimplement the logic here
    for field_name in self.meta.special_getter_fields:
        if field_name == "pk":
            continue
        if not should_show_pk or field_name not in self.pknames:
            if field_name in initial_full_field_exclude:
                continue
            if include is not None and field_name not in include:
                continue
            if getattr(field_name, "exclude", False):
                continue
        field: BaseFieldType = self.meta.fields[field_name]
        try:
            retval = field.__get__(self, self.__class__)
        except AttributeError:
            continue
        sub_include = None
        if isinstance(include, dict):
            sub_include = include.get(field_name, None)
            if sub_include is True:
                sub_include = None
        sub_exclude = None
        if isinstance(exclude, dict):
            sub_exclude = exclude.get(field_name, None)
            if sub_exclude is True:
                sub_exclude = None
        if isinstance(retval, BaseModel):
            retval = retval.model_dump(
                include=sub_include, exclude=sub_exclude, mode=mode, **kwargs
            )
        else:
            assert (
                sub_include is None
            ), "sub include filters for CompositeField specified, but no Pydantic model is set"
            assert (
                sub_exclude is None
            ), "sub exclude filters for CompositeField specified, but no Pydantic model is set"
            if mode == "json" and not getattr(field, "unsafe_json_serialization", False):
                # skip field if it isn't a BaseModel and the mode is json and unsafe_json_serialization is not set
                # currently unsafe_json_serialization exists only on CompositeFields
                continue
        alias: str = field_name
        if getattr(field, "serialization_alias", None):
            alias = cast(str, field.serialization_alias)
        elif getattr(field, "alias", None):
            alias = field.alias
        model[alias] = retval
    # proxyModel? cause excluded fields to reappear
    # TODO: find a better bugfix
    for excluded_field in self.meta.excluded_fields:
        model.pop(excluded_field, None)
    return model

build classmethod

build(schema=None, metadata=None)

The inspect is done in an async manner and reflects the objects from the database.

PARAMETER DESCRIPTION
schema

TYPE: Optional[str] DEFAULT: None

metadata

TYPE: Optional[MetaData] DEFAULT: None

Source code in edgy/core/db/models/mixins/reflection.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@classmethod
def build(
    cls, schema: Optional[str] = None, metadata: Optional[sqlalchemy.MetaData] = None
) -> Any:
    """
    The inspect is done in an async manner and reflects the objects from the database.
    """
    registry = cls.meta.registry
    assert registry, "registry is not set"
    if metadata is None:
        metadata = registry.metadata_by_url[str(cls.database.url)]
    schema_name = schema or cls.get_active_class_schema()

    tablename: str = cast("str", cls.meta.tablename)
    return run_sync(cls.reflect(cls.database, tablename, metadata, schema_name))

execute_post_save_hooks async

execute_post_save_hooks(fields, force_insert)
PARAMETER DESCRIPTION
fields

TYPE: Sequence[str]

force_insert

TYPE: bool

Source code in edgy/core/db/models/base.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
async def execute_post_save_hooks(self, fields: Sequence[str], force_insert: bool) -> None:
    affected_fields = self.meta.post_save_fields.intersection(fields)
    if affected_fields:
        # don't trigger loads, AttributeErrors are used for skipping fields
        token = MODEL_GETATTR_BEHAVIOR.set("passdown")
        token2 = CURRENT_MODEL_INSTANCE.set(self)
        try:
            for field_name in affected_fields:
                field = self.meta.fields[field_name]
                try:
                    value = getattr(self, field_name)
                except AttributeError:
                    continue
                await field.post_save_callback(value, instance=self, force_insert=force_insert)
        finally:
            MODEL_GETATTR_BEHAVIOR.reset(token)
            CURRENT_MODEL_INSTANCE.reset(token2)

execute_pre_save_hooks async

execute_pre_save_hooks(column_values, original, force_insert)
PARAMETER DESCRIPTION
column_values

TYPE: dict[str, Any]

original

TYPE: dict[str, Any]

force_insert

TYPE: bool

Source code in edgy/core/db/models/base.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
async def execute_pre_save_hooks(
    self, column_values: dict[str, Any], original: dict[str, Any], force_insert: bool
) -> dict[str, Any]:
    # also handle defaults
    keys = {*column_values.keys(), *original.keys()}
    affected_fields = self.meta.pre_save_fields.intersection(keys)
    retdict: dict[str, Any] = {}
    if affected_fields:
        # don't trigger loads
        token = MODEL_GETATTR_BEHAVIOR.set("passdown")
        token2 = CURRENT_MODEL_INSTANCE.set(self)
        try:
            for field_name in affected_fields:
                if field_name not in column_values and field_name not in original:
                    continue
                field = self.meta.fields[field_name]
                retdict.update(
                    await field.pre_save_callback(
                        column_values.get(field_name),
                        original.get(field_name),
                        force_insert=force_insert,
                        instance=self,
                    )
                )
        finally:
            MODEL_GETATTR_BEHAVIOR.reset(token)
            CURRENT_MODEL_INSTANCE.reset(token2)
    return retdict

extract_column_values classmethod

extract_column_values(extracted_values, is_update=False, is_partial=False, phase='', instance=None, model_instance=None)
PARAMETER DESCRIPTION
extracted_values

TYPE: dict[str, Any]

is_update

TYPE: bool DEFAULT: False

is_partial

TYPE: bool DEFAULT: False

phase

TYPE: str DEFAULT: ''

instance

TYPE: Optional[Union[BaseModelType, QuerySet]] DEFAULT: None

model_instance

TYPE: Optional[BaseModelType] DEFAULT: None

Source code in edgy/core/db/models/base.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
@classmethod
def extract_column_values(
    cls,
    extracted_values: dict[str, Any],
    is_update: bool = False,
    is_partial: bool = False,
    phase: str = "",
    instance: Optional[Union[BaseModelType, QuerySet]] = None,
    model_instance: Optional[BaseModelType] = None,
) -> dict[str, Any]:
    validated: dict[str, Any] = {}
    token = CURRENT_PHASE.set(phase)
    token2 = CURRENT_INSTANCE.set(instance)
    token3 = CURRENT_MODEL_INSTANCE.set(model_instance)
    try:
        # phase 1: transform when required
        if cls.meta.input_modifying_fields:
            extracted_values = {**extracted_values}
            for field_name in cls.meta.input_modifying_fields:
                cls.meta.fields[field_name].modify_input(field_name, extracted_values)
        # phase 2: validate fields and set defaults for readonly
        need_second_pass: list[BaseFieldType] = []
        for field_name, field in cls.meta.fields.items():
            if field.read_only:
                # if read_only, updates are not possible anymore
                if (
                    not is_partial or (field.inject_default_on_partial_update and is_update)
                ) and field.has_default():
                    validated.update(field.get_default_values(field_name, validated))
                continue
            if field_name in extracted_values:
                item = extracted_values[field_name]
                assert field.owner
                for sub_name, value in field.clean(field_name, item).items():
                    if sub_name in validated:
                        raise ValueError(f"value set twice for key: {sub_name}")
                    validated[sub_name] = value
            elif (
                not is_partial or (field.inject_default_on_partial_update and is_update)
            ) and field.has_default():
                # add field without a value to the second pass (in case no value appears)
                # only include fields which have inject_default_on_partial_update set or if not is_partial
                need_second_pass.append(field)

        # phase 3: set defaults for the rest if not partial or inject_default_on_partial_update
        if need_second_pass:
            for field in need_second_pass:
                # check if field appeared e.g. by composite
                # Note: default values are directly passed without validation
                if field.name not in validated:
                    validated.update(field.get_default_values(field.name, validated))
    finally:
        CURRENT_MODEL_INSTANCE.reset(token3)
        CURRENT_INSTANCE.reset(token2)
        CURRENT_PHASE.reset(token)
    return validated

extract_db_fields

extract_db_fields(only=None)

Extracts all the db fields, model references and fields. Related fields are not included because they are disjoint.

PARAMETER DESCRIPTION
only

TYPE: Optional[Sequence[str]] DEFAULT: None

Source code in edgy/core/db/models/types.py
185
186
187
188
189
190
191
192
193
194
195
196
def extract_db_fields(self, only: Optional[Sequence[str]] = None) -> dict[str, Any]:
    """
    Extracts all the db fields, model references and fields.
    Related fields are not included because they are disjoint.
    """
    fields = self.meta.fields
    columns = self.table.columns

    if only is not None:
        return {k: v for k, v in self.__dict__.items() if k in only}

    return {k: v for k, v in self.__dict__.items() if k in fields or hasattr(columns, k)}

get_instance_name

get_instance_name()

Returns the name of the class in lowercase.

Source code in edgy/core/db/models/types.py
198
199
200
201
202
def get_instance_name(self) -> str:
    """
    Returns the name of the class in lowercase.
    """
    return self.__class__.__name__.lower()

create_model_key

create_model_key()

Build a cache key for the model.

Source code in edgy/core/db/models/types.py
204
205
206
207
208
209
210
211
212
def create_model_key(self) -> tuple:
    """
    Build a cache key for the model.
    """
    pk_key_list: list[Any] = [type(self).__name__]
    # there are no columns, only column results
    for attr in self.pkcolumns:
        pk_key_list.append(str(getattr(self, attr)))
    return tuple(pk_key_list)

transform_input classmethod

transform_input(kwargs, phase='', instance=None)

Expand to_models and apply input modifications.

PARAMETER DESCRIPTION
kwargs

TYPE: Any

phase

TYPE: str DEFAULT: ''

instance

TYPE: Optional[BaseModelType] DEFAULT: None

Source code in edgy/core/db/models/base.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
@classmethod
def transform_input(
    cls,
    kwargs: Any,
    phase: str = "",
    instance: Optional[BaseModelType] = None,
) -> Any:
    """
    Expand to_models and apply input modifications.
    """

    kwargs = {**kwargs}
    new_kwargs: dict[str, Any] = {}

    fields = cls.meta.fields
    token = CURRENT_INSTANCE.set(instance)
    token2 = CURRENT_MODEL_INSTANCE.set(instance)
    token3 = CURRENT_PHASE.set(phase)
    try:
        # phase 1: transform
        # Note: this is order dependend. There should be no overlap.
        for field_name in cls.meta.input_modifying_fields:
            fields[field_name].modify_input(field_name, kwargs)
        # phase 2: apply to_model
        for key, value in kwargs.items():
            field = fields.get(key, None)
            if field is not None:
                new_kwargs.update(**field.to_model(key, value))
            else:
                new_kwargs[key] = value
    finally:
        CURRENT_PHASE.reset(token3)
        CURRENT_MODEL_INSTANCE.reset(token2)
        CURRENT_INSTANCE.reset(token)
    return new_kwargs

model_dump_json

model_dump_json(**kwargs)
PARAMETER DESCRIPTION
**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/base.py
314
315
def model_dump_json(self, **kwargs: Any) -> str:
    return orjson.dumps(self.model_dump(mode="json", **kwargs)).decode()

__setattr__

__setattr__(key, value)
PARAMETER DESCRIPTION
key

TYPE: str

value

TYPE: Any

Source code in edgy/core/db/models/base.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def __setattr__(self, key: str, value: Any) -> None:
    fields = self.meta.fields
    field = fields.get(key, None)
    token = CURRENT_INSTANCE.set(self)
    token2 = CURRENT_PHASE.set("set")
    try:
        if field is not None:
            if hasattr(field, "__set__"):
                # not recommended, better to use to_model instead except for kept objects
                # used in related_fields to mask and not to implement to_model
                field.__set__(self, value)
            else:
                for k, v in field.to_model(key, value).items():
                    if k in self.model_fields:
                        # __dict__ is updated and validator is executed
                        super().__setattr__(k, v)
                    else:
                        # bypass __setattr__ method
                        # ensures, __dict__ is updated
                        object.__setattr__(self, k, v)
        elif key in self.model_fields:
            # __dict__ is updated and validator is executed
            super().__setattr__(key, value)
        else:
            # bypass __setattr__ method
            # ensures, __dict__ is updated
            object.__setattr__(self, key, value)
    finally:
        CURRENT_INSTANCE.reset(token)
        CURRENT_PHASE.reset(token2)

_agetattr_helper async

_agetattr_helper(name, getter)
PARAMETER DESCRIPTION
name

TYPE: str

getter

TYPE: Any

Source code in edgy/core/db/models/base.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
async def _agetattr_helper(self, name: str, getter: Any) -> Any:
    await self.load()
    if getter is not None:
        token = MODEL_GETATTR_BEHAVIOR.set("coro")
        try:
            result = getter(self, self.__class__)
            if inspect.isawaitable(result):
                result = await result
            return result
        finally:
            MODEL_GETATTR_BEHAVIOR.reset(token)
    try:
        return self.__dict__[name]
    except KeyError:
        raise AttributeError(f"Attribute: {name} not found") from None

__getattr__

__getattr__(name)

Does following things 1. Initialize managers on access 2. Redirects get accesses to getter fields 3. Run an one off query to populate any foreign key making sure it runs only once per foreign key avoiding multiple database calls.

PARAMETER DESCRIPTION
name

TYPE: str

Source code in edgy/core/db/models/base.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
def __getattr__(self, name: str) -> Any:
    """
    Does following things
    1. Initialize managers on access
    2. Redirects get accesses to getter fields
    3. Run an one off query to populate any foreign key making sure
       it runs only once per foreign key avoiding multiple database calls.
    """
    behavior = MODEL_GETATTR_BEHAVIOR.get()
    manager = self.meta.managers.get(name)
    if manager is not None:
        if name not in self.__dict__:
            manager = copy.copy(manager)
            manager.instance = self
            self.__dict__[name] = manager
        return self.__dict__[name]

    field = self.meta.fields.get(name)
    getter: Any = None
    if field is not None and hasattr(field, "__get__"):
        getter = field.__get__
        if behavior == "coro" or behavior == "passdown":
            return field.__get__(self, self.__class__)
        else:
            token = MODEL_GETATTR_BEHAVIOR.set("passdown")
            # no need to set an descriptor object
            try:
                return field.__get__(self, self.__class__)
            except AttributeError:
                # forward to load routine
                pass
            finally:
                MODEL_GETATTR_BEHAVIOR.reset(token)
    if (
        name not in self.__dict__
        and behavior != "passdown"
        and not self.__dict__.get("_loaded_or_deleted", False)
        and (field is not None or self.__reflected__)
        and name not in self.identifying_db_fields
        and self.can_load
    ):
        coro = self._agetattr_helper(name, getter)
        if behavior == "coro":
            return coro
        return run_sync(coro)
    return super().__getattr__(name)

__eq__

__eq__(other)
PARAMETER DESCRIPTION
other

TYPE: Any

Source code in edgy/core/db/models/base.py
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def __eq__(self, other: Any) -> bool:
    # if self.__class__ != other.__class__:
    #     return False
    # somehow meta gets regenerated, so just compare tablename and registry.
    if self.meta.registry is not other.meta.registry:
        return False
    if self.meta.tablename != other.meta.tablename:
        return False
    self_dict = self.extract_column_values(
        self.extract_db_fields(self.pkcolumns),
        is_partial=True,
        phase="compare",
        instance=self,
        model_instance=self,
    )
    other_dict = other.extract_column_values(
        other.extract_db_fields(self.pkcolumns),
        is_partial=True,
        phase="compare",
        instance=other,
        model_instance=other,
    )
    key_set = {*self_dict.keys(), *other_dict.keys()}
    for field_name in key_set:
        if self_dict.get(field_name) != other_dict.get(field_name):
            return False
    return True

add_to_registry classmethod

add_to_registry(registry, name='', database='keep', replace_related_field=False)
PARAMETER DESCRIPTION
registry

TYPE: Registry

name

TYPE: str DEFAULT: ''

database

TYPE: Union[bool, Database, Literal['keep']] DEFAULT: 'keep'

replace_related_field

TYPE: Union[bool, type[BaseModelType]] DEFAULT: False

Source code in edgy/core/db/models/mixins/db.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
@classmethod
def add_to_registry(
    cls: type["BaseModelType"],
    registry: "Registry",
    name: str = "",
    database: Union[bool, "Database", Literal["keep"]] = "keep",
    replace_related_field: Union[bool, type["BaseModelType"]] = False,
) -> None:
    # when called if registry is not set
    cls.meta.registry = registry
    if database is True:
        cls.database = registry.database
    elif database is not False:
        if database == "keep":
            if getattr(cls, "database", None) is None:
                cls.database = registry.database

        else:
            cls.database = database
    meta = cls.meta
    if name:
        cls.__name__ = name

    # Making sure it does not generate models if abstract or a proxy
    if not meta.abstract and not cls.__is_proxy_model__:
        if getattr(cls, "__reflected__", False):
            registry.reflected[cls.__name__] = cls
        else:
            registry.models[cls.__name__] = cls
        # after registrating the own model
        for value in list(meta.fields.values()):
            if isinstance(value, BaseManyToManyForeignKeyField):
                m2m_registry: Registry = value.target_registry
                with contextlib.suppress(Exception):
                    m2m_registry = cast("Registry", value.target.registry)

                def create_through_model(x: Any, field: "BaseFieldType" = value) -> None:
                    # we capture with field = ... the variable
                    field.create_through_model()

                m2m_registry.register_callback(value.to, create_through_model, one_time=True)
        # Sets the foreign key fields
        if meta.foreign_key_fields:
            _set_related_name_for_foreign_keys(
                meta, cls, replace_related_field=replace_related_field
            )
        registry.execute_model_callbacks(cls)

    # finalize
    cls.model_rebuild(force=True)

get_active_instance_schema

get_active_instance_schema(check_schema=True, check_tenant=True)
PARAMETER DESCRIPTION
check_schema

TYPE: bool DEFAULT: True

check_tenant

TYPE: bool DEFAULT: True

Source code in edgy/core/db/models/mixins/db.py
201
202
203
204
205
206
207
208
def get_active_instance_schema(
    self, check_schema: bool = True, check_tenant: bool = True
) -> Union[str, None]:
    if self.__using_schema__ is not Undefined:
        return cast(Union[str, None], self.__using_schema__)
    return self.__class__.get_active_class_schema(
        check_schema=check_schema, check_tenant=check_tenant
    )

get_active_class_schema classmethod

get_active_class_schema(check_schema=True, check_tenant=True)
PARAMETER DESCRIPTION
check_schema

TYPE: bool DEFAULT: True

check_tenant

TYPE: bool DEFAULT: True

Source code in edgy/core/db/models/mixins/db.py
210
211
212
213
214
215
216
217
218
219
220
@classmethod
def get_active_class_schema(cls, check_schema: bool = True, check_tenant: bool = True) -> str:
    if cls.__using_schema__ is not Undefined:
        return cast(Union[str, None], cls.__using_schema__)
    if check_schema:
        schema = get_schema(check_tenant=check_tenant)
        if schema is not None:
            return schema
    db_schema: Optional[str] = cls.get_db_schema()
    # sometime "" is ok, sometimes not, sqlalchemy logic
    return db_schema or None

copy_edgy_model classmethod

copy_edgy_model(registry=None, name='', **kwargs)

Copy the model class and optionally add it to another registry.

PARAMETER DESCRIPTION
registry

TYPE: Optional[Registry] DEFAULT: None

name

TYPE: str DEFAULT: ''

**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/mixins/db.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
@classmethod
def copy_edgy_model(
    cls: type["Model"], registry: Optional["Registry"] = None, name: str = "", **kwargs: Any
) -> type["Model"]:
    """Copy the model class and optionally add it to another registry."""
    # removes private pydantic stuff, except the prefixed ones
    attrs = {
        key: val for key, val in cls.__dict__.items() if key not in cls._removed_copy_keys
    }
    # managers and fields are gone, we have to readd them with the correct data
    attrs.update(
        (
            (field_name, field)
            for field_name, field in cls.meta.fields.items()
            if not field.no_copy
        )
    )
    attrs.update(cls.meta.managers)
    _copy = cast(
        type["Model"],
        type(cls.__name__, cls.__bases__, attrs, skip_registry=True, **kwargs),
    )
    for field_name in _copy.meta.foreign_key_fields:
        # we need to unreference and check if both models are in the same registry
        if cls.meta.fields[field_name].target.meta.registry is cls.meta.registry:
            _copy.meta.fields[field_name].target = cls.meta.fields[field_name].target.__name__
        else:
            # otherwise we need to disable backrefs
            _copy.meta.fields[field_name].target.related_name = False
    if name:
        _copy.__name__ = name
    if registry is not None:
        # replace when old class otherwise old references can lead to issues
        _copy.add_to_registry(registry, replace_related_field=cls)
    return _copy

_update async

_update(**kwargs)

Update operation of the database fields.

PARAMETER DESCRIPTION
**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/mixins/db.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
async def _update(self: "Model", **kwargs: Any) -> Any:
    """
    Update operation of the database fields.
    """
    await self.meta.signals.pre_update.send_async(self.__class__, instance=self)
    column_values = self.extract_column_values(
        extracted_values=kwargs,
        is_partial=True,
        is_update=True,
        phase="prepare_update",
        instance=self,
        model_instance=self,
    )
    # empty updates shouldn't cause an error. E.g. only model references are updated
    clauses = self.identifying_clauses()
    token = CURRENT_INSTANCE.set(self)
    try:
        if column_values and clauses:
            check_db_connection(self.database, stacklevel=4)
            async with self.database as database, database.transaction():
                # can update column_values
                column_values.update(
                    await self.execute_pre_save_hooks(
                        column_values, kwargs, force_insert=False
                    )
                )
                expression = self.table.update().values(**column_values).where(*clauses)
                await database.execute(expression)

            # Update the model instance.
            new_kwargs = self.transform_input(
                column_values, phase="post_update", instance=self
            )
            self.__dict__.update(new_kwargs)

        # updates aren't required to change the db, they can also just affect the meta fields
        await self.execute_post_save_hooks(
            cast(Sequence[str], kwargs.keys()), force_insert=False
        )

    finally:
        CURRENT_INSTANCE.reset(token)
    if column_values or kwargs:
        # Ensure on access refresh the results is active
        self._loaded_or_deleted = False
    await self.meta.signals.post_update.send_async(self.__class__, instance=self)

_insert async

_insert(**kwargs)

Performs the save instruction.

PARAMETER DESCRIPTION
**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/mixins/db.py
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
async def _insert(self: "Model", **kwargs: Any) -> "Model":
    """
    Performs the save instruction.
    """
    column_values: dict[str, Any] = self.extract_column_values(
        extracted_values=kwargs,
        is_partial=False,
        is_update=False,
        phase="prepare_insert",
        instance=self,
        model_instance=self,
    )
    check_db_connection(self.database, stacklevel=4)
    token = CURRENT_INSTANCE.set(self)
    try:
        async with self.database as database, database.transaction():
            # can update column_values
            column_values.update(
                await self.execute_pre_save_hooks(column_values, kwargs, force_insert=True)
            )
            expression = self.table.insert().values(**column_values)
            autoincrement_value = await database.execute(expression)
        # sqlalchemy supports only one autoincrement column
        if autoincrement_value:
            column = self.table.autoincrement_column
            if column is not None and hasattr(autoincrement_value, "_mapping"):
                autoincrement_value = autoincrement_value._mapping[column.key]
            # can be explicit set, which causes an invalid value returned
            if column is not None and column.key not in column_values:
                column_values[column.key] = autoincrement_value

        new_kwargs = self.transform_input(column_values, phase="post_insert", instance=self)
        self.__dict__.update(new_kwargs)

        if self.meta.post_save_fields:
            await self.execute_post_save_hooks(
                cast(Sequence[str], kwargs.keys()), force_insert=True
            )
    finally:
        CURRENT_INSTANCE.reset(token)
    # Ensure on access refresh the results is active
    self._loaded_or_deleted = False

    return self

_get_unique_constraints classmethod

_get_unique_constraints(fields)

Returns the unique constraints for the model.

The columns must be a a list, tuple of strings or a UniqueConstraint object.

:return: Model UniqueConstraint.

PARAMETER DESCRIPTION
fields

TYPE: Union[Sequence, str, UniqueConstraint]

Source code in edgy/core/db/models/mixins/db.py
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
@classmethod
def _get_unique_constraints(
    cls, fields: Union[Sequence, str, sqlalchemy.UniqueConstraint]
) -> Optional[sqlalchemy.UniqueConstraint]:
    """
    Returns the unique constraints for the model.

    The columns must be a a list, tuple of strings or a UniqueConstraint object.

    :return: Model UniqueConstraint.
    """
    if isinstance(fields, str):
        return sqlalchemy.UniqueConstraint(*cls.meta.field_to_column_names[fields])
    elif isinstance(fields, UniqueConstraint):
        return sqlalchemy.UniqueConstraint(
            *chain.from_iterable(
                cls.meta.field_to_column_names[field] for field in fields.fields
            ),
            name=fields.name,
            deferrable=fields.deferrable,
            initially=fields.initially,
        )
    return sqlalchemy.UniqueConstraint(
        *chain.from_iterable(cls.meta.field_to_column_names[field] for field in fields)
    )

_get_indexes classmethod

_get_indexes(index)

Creates the index based on the Index fields

PARAMETER DESCRIPTION
index

TYPE: Index

Source code in edgy/core/db/models/mixins/db.py
627
628
629
630
631
632
633
634
635
636
637
638
639
640
@classmethod
def _get_indexes(cls, index: Index) -> Optional[sqlalchemy.Index]:
    """
    Creates the index based on the Index fields
    """
    return sqlalchemy.Index(
        index.name,
        *chain.from_iterable(
            [field]
            if isinstance(field, sqlalchemy.TextClause)
            else cls.meta.field_to_column_names[field]
            for field in index.fields
        ),
    )

declarative classmethod

declarative()
Source code in edgy/core/db/models/mixins/generics.py
12
13
14
@classmethod
def declarative(cls) -> Any:
    return cls.generate_model_declarative()

generate_model_declarative classmethod

generate_model_declarative()

Transforms a core Edgy table into a Declarative model table.

Source code in edgy/core/db/models/mixins/generics.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@classmethod
def generate_model_declarative(cls) -> Any:
    """
    Transforms a core Edgy table into a Declarative model table.
    """
    Base = cls.meta.registry.declarative_base

    # Build the original table
    fields = {"__table__": cls.table}

    # Generate base
    model_table = type(cls.__name__, (Base,), fields)

    # Make sure if there are foreignkeys, builds the relationships
    for column in cls.table.columns:
        if not column.foreign_keys:
            continue

        # Maps the relationships with the foreign keys and related names
        field = cls.meta.fields.get(column.name)
        to = field.to.__name__ if inspect.isclass(field.to) else field.to
        mapped_model: Mapped[to] = relationship(to)  # type: ignore

        # Adds to the current model
        model_table.__mapper__.add_property(f"{column.name}_relation", mapped_model)

    return model_table

can_load_from_row classmethod

can_load_from_row(row, table)

Check if a model_class can be loaded from a row for the table.

PARAMETER DESCRIPTION
row

TYPE: Row

table

TYPE: Table

Source code in edgy/core/db/models/mixins/row.py
24
25
26
27
28
29
30
31
32
@classmethod
def can_load_from_row(cls: type["Model"], row: "Row", table: "Table") -> bool:
    """Check if a model_class can be loaded from a row for the table."""

    return bool(
        cls.meta.registry
        and not cls.meta.abstract
        and all(row._mapping.get(f"{table.name}_{col}") is not None for col in cls.pkcolumns)
    )

from_sqla_row async classmethod

from_sqla_row(row, tables_and_models, select_related=None, prefetch_related=None, only_fields=None, is_defer_fields=False, exclude_secrets=False, using_schema=None, database=None, prefix='', old_select_related_value=None)

Class method to convert a SQLAlchemy Row result into a EdgyModel row type.

Looping through select_related fields if the query comes from a select_related operation. Validates if exists the select_related and related_field inside the models.

When select_related and related_field exist for the same field being validated, the related field is ignored as it won't override the value already collected from the select_related.

If there is no select_related, then goes through the related field where it should only return the instance of the the ForeignKey with the ID, making it lazy loaded.

:return: Model class.

PARAMETER DESCRIPTION
row

TYPE: Row

tables_and_models

TYPE: dict[str, tuple[Table, type[BaseModelType]]]

select_related

TYPE: Optional[Sequence[Any]] DEFAULT: None

prefetch_related

TYPE: Optional[Sequence[Prefetch]] DEFAULT: None

only_fields

TYPE: Sequence[str] DEFAULT: None

is_defer_fields

TYPE: bool DEFAULT: False

exclude_secrets

TYPE: bool DEFAULT: False

using_schema

TYPE: Optional[str] DEFAULT: None

database

TYPE: Optional[Database] DEFAULT: None

prefix

TYPE: str DEFAULT: ''

old_select_related_value

TYPE: Optional[Model] DEFAULT: None

Source code in edgy/core/db/models/mixins/row.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
@classmethod
async def from_sqla_row(
    cls: type["Model"],
    row: "Row",
    # contain the mappings used for select
    tables_and_models: dict[str, tuple["Table", type["BaseModelType"]]],
    select_related: Optional[Sequence[Any]] = None,
    prefetch_related: Optional[Sequence["Prefetch"]] = None,
    only_fields: Sequence[str] = None,
    is_defer_fields: bool = False,
    exclude_secrets: bool = False,
    using_schema: Optional[str] = None,
    database: Optional["Database"] = None,
    prefix: str = "",
    old_select_related_value: Optional["Model"] = None,
) -> Optional["Model"]:
    """
    Class method to convert a SQLAlchemy Row result into a EdgyModel row type.

    Looping through select_related fields if the query comes from a select_related operation.
    Validates if exists the select_related and related_field inside the models.

    When select_related and related_field exist for the same field being validated, the related
    field is ignored as it won't override the value already collected from the select_related.

    If there is no select_related, then goes through the related field where it **should**
    only return the instance of the the ForeignKey with the ID, making it lazy loaded.

    :return: Model class.
    """
    item: dict[str, Any] = {}
    select_related = select_related or []
    prefetch_related = prefetch_related or []
    secret_columns: set[str] = set()
    if exclude_secrets:
        for name in cls.meta.secret_fields:
            secret_columns.update(cls.meta.field_to_column_names[name])

    for related in select_related:
        field_name = related.split("__", 1)[0]
        try:
            field = cls.meta.fields[field_name]
        except KeyError:
            raise QuerySetError(
                detail=f'Selected field "{field_name}cast("Model", " does not exist on {cls}.'
            ) from None
        if isinstance(field, RelationshipField):
            model_class, _, remainder = field.traverse_field(related)
        else:
            raise QuerySetError(
                detail=f'Selected field "{field_name}" is not a RelationshipField on {cls}.'
            ) from None

        _prefix = field_name if not prefix else f"{prefix}__{field_name}"
        # stop selecting when None. Related models are not available.
        if not model_class.can_load_from_row(
            row,
            tables_and_models[_prefix][0],
        ):
            continue

        if remainder:
            # don't pass table, it is only for the main model_class
            item[field_name] = await model_class.from_sqla_row(
                row,
                tables_and_models=tables_and_models,
                select_related=[remainder],
                prefetch_related=prefetch_related,
                exclude_secrets=exclude_secrets,
                is_defer_fields=is_defer_fields,
                using_schema=using_schema,
                database=database,
                prefix=_prefix,
                old_select_related_value=item.get(field_name),
            )
        else:
            # don't pass table, it is only for the main model_class
            item[field_name] = await model_class.from_sqla_row(
                row,
                tables_and_models=tables_and_models,
                exclude_secrets=exclude_secrets,
                is_defer_fields=is_defer_fields,
                using_schema=using_schema,
                database=database,
                prefix=_prefix,
                old_select_related_value=item.get(field_name),
            )
    # don't overwrite, update with new values and return
    if old_select_related_value:
        for k, v in item.items():
            setattr(old_select_related_value, k, v)
        return old_select_related_value
    table_columns = tables_and_models[prefix][0].columns
    # Populate the related names
    # Making sure if the model being queried is not inside a select related
    # This way it is not overritten by any value
    for related in cls.meta.foreign_key_fields:
        foreign_key = cls.meta.fields[related]
        ignore_related: bool = cls.__should_ignore_related_name(related, select_related)
        if ignore_related or related in cls.meta.secret_fields:
            continue
        if related in item:
            continue

        if exclude_secrets and foreign_key.secret:
            continue
        columns_to_check = foreign_key.get_column_names(related)

        model_related = foreign_key.target

        child_item = {}
        for column_name in columns_to_check:
            column = getattr(table_columns, column_name, None)
            if column_name is None:
                continue
            columnkeyhash = column_name
            if prefix:
                columnkeyhash = f"{tables_and_models[prefix][0].name}_{column.key}"

            if columnkeyhash in row._mapping:
                child_item[foreign_key.from_fk_field_name(related, column_name)] = (
                    row._mapping[columnkeyhash]
                )
        # Make sure we generate a temporary reduced model
        # For the related fields. We simply chnage the structure of the model
        # and rebuild it with the new fields.
        proxy_model = model_related.proxy_model(**child_item)
        proxy_database = database if model_related.database is cls.database else None
        # don't pass a table. It is not in the row (select related path) and has not an explicit table
        proxy_model = apply_instance_extras(
            proxy_model,
            model_related,
            using_schema,
            database=proxy_database,
        )
        proxy_model.identifying_db_fields = foreign_key.related_columns

        item[related] = proxy_model

    # Check for the only_fields
    # Pull out the regular column values.
    for column in table_columns:
        if (
            only_fields
            and prefix not in only_fields
            and (f"{prefix}__{column.key}" if prefix else column.key) not in only_fields
        ):
            continue
        if column.key in secret_columns:
            continue
        if column.key not in cls.meta.columns_to_field:
            continue
        # set if not of an foreign key with one column
        if column.key in item:
            continue
        columnkeyhash = column.key
        if prefix:
            columnkeyhash = f"{tables_and_models[prefix][0].name}_{columnkeyhash}"

        if columnkeyhash in row._mapping:
            item[column.key] = row._mapping[columnkeyhash]
    model: Model = (
        cls.proxy_model(**item, __phase__="init_db")  # type: ignore
        if exclude_secrets or is_defer_fields or only_fields
        else cls(**item, __phase__="init_db")
    )
    # Apply the schema to the model
    model = apply_instance_extras(
        model,
        cls,
        using_schema,
        database=database,
        table=tables_and_models[prefix][0],
    )

    if prefetch_related:
        # Handle prefetch related fields.
        await cls.__handle_prefetch_related(
            row=row,
            prefix=prefix,
            model=model,
            tables_and_models=tables_and_models,
            prefetch_related=prefetch_related,
        )
    assert model.pk is not None, model
    return model
__should_ignore_related_name(related_name, select_related)

Validates if it should populate the related field if select related is not considered.

PARAMETER DESCRIPTION
related_name

TYPE: str

select_related

TYPE: Sequence[str]

Source code in edgy/core/db/models/mixins/row.py
221
222
223
224
225
226
227
228
229
230
231
232
@classmethod
def __should_ignore_related_name(
    cls, related_name: str, select_related: Sequence[str]
) -> bool:
    """
    Validates if it should populate the related field if select related is not considered.
    """
    for related_field in select_related:
        fields = related_field.split("__")
        if related_name in fields:
            return True
    return False

create_model_key_from_sqla_row classmethod

create_model_key_from_sqla_row(row, row_prefix='')

Build a cache key for the model.

PARAMETER DESCRIPTION
row

TYPE: Row

row_prefix

TYPE: str DEFAULT: ''

Source code in edgy/core/db/models/mixins/row.py
234
235
236
237
238
239
240
241
242
@classmethod
def create_model_key_from_sqla_row(cls, row: "Row", row_prefix: str = "") -> tuple:
    """
    Build a cache key for the model.
    """
    pk_key_list: list[Any] = [cls.__name__]
    for attr in cls.pkcolumns:
        pk_key_list.append(str(row._mapping[f"{row_prefix}{attr}"]))
    return tuple(pk_key_list)

__set_prefetch async classmethod

__set_prefetch(row, model, row_prefix, related)
PARAMETER DESCRIPTION
row

TYPE: Row

model

TYPE: Model

row_prefix

TYPE: str

related

TYPE: Prefetch

Source code in edgy/core/db/models/mixins/row.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
@classmethod
async def __set_prefetch(
    cls,
    row: "Row",
    model: "Model",
    row_prefix: str,
    related: "Prefetch",
) -> None:
    model_key = ()
    if related._is_finished:
        # when force_rollback
        # we can only bake after all rows are retrieved
        # this is why it is here
        await related.init_bake(type(model))
        model_key = model.create_model_key()
    if model_key in related._baked_results:
        setattr(model, related.to_attr, related._baked_results[model_key])
    else:
        crawl_result = crawl_relationship(
            model.__class__, related.related_name, traverse_last=True
        )
        if crawl_result.reverse_path is False:
            QuerySetError(
                detail=("Creating a reverse path is not possible, unidirectional fields used.")
            )
        if crawl_result.cross_db_remainder:
            raise NotImplementedError(
                "Cannot prefetch from other db yet. Maybe in future this feature will be added."
            )
        queryset = related.queryset
        if related._is_finished:
            assert queryset is not None, "Queryset is not set but _is_finished flag"
        else:
            check_prefetch_collision(model, related)
            if queryset is None:
                queryset = crawl_result.model_class.query.all()

            queryset = queryset.select_related(cast(str, crawl_result.reverse_path))
        clause = {
            f"{crawl_result.reverse_path}__{pkcol}": row._mapping[f"{row_prefix}{pkcol}"]
            for pkcol in cls.pkcolumns
        }
        setattr(model, related.to_attr, await queryset.filter(clause))
__handle_prefetch_related(row, model, prefix, tables_and_models, prefetch_related)

Handles any prefetch related scenario from the model. Loads in advance all the models needed for a specific record

Recursively checks for the related field and validates if there is any conflicting attribute. If there is, a QuerySetError is raised.

PARAMETER DESCRIPTION
row

TYPE: Row

model

TYPE: Model

prefix

TYPE: str

tables_and_models

TYPE: dict[str, tuple[Table, type[BaseModelType]]]

prefetch_related

TYPE: Sequence[Prefetch]

Source code in edgy/core/db/models/mixins/row.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
@classmethod
async def __handle_prefetch_related(
    cls,
    row: "Row",
    model: "Model",
    prefix: str,
    tables_and_models: dict[str, tuple["Table", type["BaseModelType"]]],
    prefetch_related: Sequence["Prefetch"],
) -> None:
    """
    Handles any prefetch related scenario from the model.
    Loads in advance all the models needed for a specific record

    Recursively checks for the related field and validates if there is any conflicting
    attribute. If there is, a `QuerySetError` is raised.
    """

    queries = []

    for related in prefetch_related:
        # Check for conflicting names
        # Check as early as possible
        check_prefetch_collision(model=model, related=related)
        row_prefix = f"{tables_and_models[prefix].name}_" if prefix else ""
        queries.append(
            cls.__set_prefetch(row=row, row_prefix=row_prefix, model=model, related=related)
        )
    if queries:
        await asyncio.gather(*queries)

fields_not_supported_by_table classmethod

fields_not_supported_by_table(table, check_type=True)

Check if the model fields are a subset of the table.

PARAMETER DESCRIPTION
table

TYPE: Table

check_type

TYPE: bool DEFAULT: True

Source code in edgy/core/db/models/mixins/reflection.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@classmethod
def fields_not_supported_by_table(
    cls, table: sqlalchemy.Table, check_type: bool = True
) -> set[str]:
    """Check if the model fields are a subset of the table."""
    field_names = set()
    for field_name, field in cls.meta.fields.items():
        field_has_typing_check = not field.skip_reflection_type_check and check_type
        for column in cls.meta.field_to_columns[field_name]:
            if (
                # string not in is not supported by sqlalchemy
                table.columns.get(column.key) is None
                or (
                    field_has_typing_check
                    and column.type.as_generic().__class__
                    != table.columns[column.key].type.as_generic().__class__
                )
            ):
                field_names.add(field_name)

    return field_names

reflect async classmethod

reflect(registry, tablename, metadata, schema=None)

Reflect a table from the database and return its SQLAlchemy Table object.

This method connects to the database using the provided registry, reflects the table with the given name and metadata, and returns the SQLAlchemy Table object.

PARAMETER DESCRIPTION
registry

TYPE: Union[Registry, Database]

tablename

TYPE: str

metadata

TYPE: MetaData

schema

TYPE: Union[str, None] DEFAULT: None

PARAMETER DESCRIPTION
registry

The registry object containing the database engine.

TYPE: Registry

tablename

The name of the table to reflect.

TYPE: str

metadata

The SQLAlchemy MetaData object to associate with the reflected table.

TYPE: MetaData

schema

The schema name where the table is located. Defaults to None.

TYPE: Union[str, None] DEFAULT: None

RETURNS DESCRIPTION
Table

sqlalchemy.Table: The reflected SQLAlchemy Table object.

RAISES DESCRIPTION
ImproperlyConfigured

If there is an error during the reflection process.

Source code in edgy/core/db/models/mixins/reflection.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@classmethod
async def reflect(
    cls,
    registry: Union["Registry", "Database"],
    tablename: str,
    metadata: sqlalchemy.MetaData,
    schema: Union[str, None] = None,
) -> sqlalchemy.Table:
    """
    Reflect a table from the database and return its SQLAlchemy Table object.

    This method connects to the database using the provided registry, reflects
    the table with the given name and metadata, and returns the SQLAlchemy
    Table object.

    Parameters:
        registry (Registry): The registry object containing the database engine.
        tablename (str): The name of the table to reflect.
        metadata (sqlalchemy.MetaData): The SQLAlchemy MetaData object to associate with the reflected table.
        schema (Union[str, None], optional): The schema name where the table is located. Defaults to None.

    Returns:
        sqlalchemy.Table: The reflected SQLAlchemy Table object.

    Raises:
        ImproperlyConfigured: If there is an error during the reflection process.
    """

    def execute_reflection(connection: sqlalchemy.Connection) -> sqlalchemy.Table:
        """Helper function to create and reflect the table."""
        try:
            return sqlalchemy.Table(
                tablename, metadata, schema=schema, autoload_with=connection
            )
        except Exception as e:
            raise e

    if hasattr(registry, "database"):
        registry = registry.database
    try:
        async with registry as database:
            table: sqlalchemy.Table = await database.run_sync(execute_reflection)
    except Exception as e:
        raise ImproperlyConfigured(detail=str(e)) from e
    unsupported_fields = cls.fields_not_supported_by_table(table)
    if unsupported_fields:
        raise ImproperlyConfigured(
            "Following fields have columns not matching the table specification:",
            ", ".join(unsupported_fields),
        )

    return table