Skip to content

ReflectModel class

edgy.ReflectModel

ReflectModel(*args, **kwargs)

Bases: Model, EdgyBaseReflectModel

Reflect on async engines is not yet supported, therefore, we need to make a sync_engine call.

PARAMETER DESCRIPTION
*args

TYPE: Any DEFAULT: ()

**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/base.py
64
65
66
67
68
69
def __init__(self, *args: Any, **kwargs: Any) -> None:
    __show_pk__ = kwargs.pop("__show_pk__", False)
    kwargs = self.transform_input(kwargs, phase="creation")
    super().__init__(**kwargs)
    self.__dict__ = self.setup_model_from_kwargs(kwargs)
    self.__show_pk__ = __show_pk__

parent class-attribute

parent

is_proxy_model class-attribute

is_proxy_model = False

query class-attribute

query = Manager()
query_related = RedirectManager(redirect_name='query')

meta class-attribute

meta = MetaInfo(None, abstract=True)

__proxy_model__ class-attribute

__proxy_model__ = None

__db_model__ class-attribute

__db_model__ = False

__reflected__ class-attribute

__reflected__ = True

__raw_query__ class-attribute

__raw_query__ = None

__using_schema__ class-attribute

__using_schema__ = None

__show_pk__ class-attribute instance-attribute

__show_pk__ = __show_pk__

raw_query property writable

raw_query

proxy_model cached property

proxy_model

identifying_db_fields cached property

identifying_db_fields

The columns used for loading, can be set per instance defaults to pknames

can_load property

can_load

signals cached property

signals

table property writable

table

pkcolumns property

pkcolumns

pknames property

pknames

Meta

abstract class-attribute instance-attribute

abstract = True

declarative classmethod

declarative()
Source code in edgy/core/db/models/mixins/generics.py
15
16
17
@classmethod
def declarative(cls) -> Any:
    return cls.generate_model_declarative()

generate_model_declarative classmethod

generate_model_declarative()

Transforms a core Edgy table into a Declarative model table.

Source code in edgy/core/db/models/mixins/generics.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@classmethod
def generate_model_declarative(cls) -> Any:
    """
    Transforms a core Edgy table into a Declarative model table.
    """
    Base = cls.meta.registry.declarative_base

    # Build the original table
    fields = {"__table__": cls.table}

    # Generate base
    model_table = type(cls.__name__, (Base,), fields)

    # Make sure if there are foreignkeys, builds the relationships
    for column in cls.table.columns:
        if not column.foreign_keys:
            continue

        # Maps the relationships with the foreign keys and related names
        field = cls.fields.get(column.name)
        to = field.to.__name__ if inspect.isclass(field.to) else field.to
        mapped_model: Mapped[to] = relationship(to)  # type: ignore

        # Adds to the current model
        model_table.__mapper__.add_property(f"{column.name}_relation", mapped_model)

    return model_table

_extract_model_references

_extract_model_references(extracted_values, model_class)

Exracts any possible model references from the EdgyModel and returns a dictionary.

PARAMETER DESCRIPTION
extracted_values

TYPE: Any

model_class

TYPE: Optional[Type[Model]]

Source code in edgy/core/utils/models.py
61
62
63
64
65
66
67
68
69
70
def _extract_model_references(self, extracted_values: Any, model_class: Optional[Type["Model"]]) -> Any:
    """
    Exracts any possible model references from the EdgyModel and returns a dictionary.
    """
    model_references = {
        name: extracted_values.get(name, None)
        for name in model_class.meta.model_references.keys()  # type: ignore
        if extracted_values.get(name)
    }
    return model_references

_extract_values_from_field

_extract_values_from_field(extracted_values, model_class=None, is_update=False, is_partial=False)

Extracts all the default values from the given fields and returns the raw value corresponding to each field.

PARAMETER DESCRIPTION
extracted_values

TYPE: Any

model_class

TYPE: Optional[Model] DEFAULT: None

is_update

TYPE: bool DEFAULT: False

is_partial

TYPE: bool DEFAULT: False

Source code in edgy/core/utils/models.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def _extract_values_from_field(
    self,
    extracted_values: Any,
    model_class: Optional["Model"] = None,
    is_update: bool = False,
    is_partial: bool = False,
) -> Any:
    """
    Extracts all the default values from the given fields and returns the raw
    value corresponding to each field.
    """
    model_cls = model_class or self
    validated: Dict[str, Any] = {}
    # phase 1: transform when required
    if model_cls.meta.input_modifying_fields:
        extracted_values = {**extracted_values}
        for field_name in model_cls.meta.input_modifying_fields:
            model_cls.fields[field_name].modify_input(field_name, extracted_values)
    # phase 2: validate fields and set defaults for readonly
    for field_name, field in model_cls.fields.items():  # type: ignore
        if not is_partial and field.read_only:
            if field.has_default():
                if not is_update:
                    validated.update(field.get_default_values(field_name, validated))
                else:
                    # For datetimes with `auto_now` and `auto_now_add`
                    if not _has_auto_now_add(field):
                        validated.update(field.get_default_values(field_name, validated))
            continue
        if field_name in extracted_values:
            item = extracted_values[field_name]
            for sub_name, value in field.clean(field_name, item).items():
                if sub_name in validated:
                    raise ValueError(f"value set twice for key: {sub_name}")
                validated[sub_name] = value

    # phase 3: set defaults for the rest if not an update
    if not is_partial:
        for field_name, field in model_cls.fields.items():  # type: ignore
            # we need a second run
            if not field.read_only and field_name not in validated:
                if field.has_default():
                    validated.update(field.get_default_values(field_name, validated))
    # Update with any ModelRef
    validated.update(self._extract_model_references(extracted_values, model_cls))
    return validated

_update_auto_now_fields

_update_auto_now_fields(values, fields)

Updates the auto_now fields

PARAMETER DESCRIPTION
values

TYPE: Any

fields

TYPE: Any

Source code in edgy/core/utils/models.py
40
41
42
43
44
45
46
47
def _update_auto_now_fields(self, values: Any, fields: Any) -> Any:
    """
    Updates the `auto_now` fields
    """
    for name, field in fields.items():
        if isinstance(field, BaseField) and _has_auto_now(field) and _is_datetime(field):
            values.update(field.get_default_values(name, values))
    return values

_resolve_value

_resolve_value(value)
PARAMETER DESCRIPTION
value

TYPE: Any

Source code in edgy/core/utils/models.py
49
50
51
52
53
54
55
56
57
def _resolve_value(self, value: typing.Any) -> typing.Any:
    if isinstance(value, dict):
        return dumps(
            value,
            option=OPT_SERIALIZE_NUMPY | OPT_OMIT_MICROSECONDS,
        ).decode("utf-8")
    elif isinstance(value, Enum):
        return value.name
    return value

transform_input classmethod

transform_input(kwargs, phase)

Expand to_model.

PARAMETER DESCRIPTION
kwargs

TYPE: Any

phase

TYPE: str

Source code in edgy/core/db/models/base.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@classmethod
def transform_input(cls, kwargs: Any, phase: str) -> Any:
    """
    Expand to_model.
    """
    kwargs = {**kwargs}
    new_kwargs: Dict[str, Any] = {}

    fields = cls.meta.fields_mapping
    # phase 1: transform
    for field_name in cls.meta.input_modifying_fields:
        fields[field_name].modify_input(field_name, kwargs)
    # phase 2: apply to_model
    for key, value in kwargs.items():
        field = fields.get(key, None)
        if field is not None:
            new_kwargs.update(**field.to_model(key, value, phase=phase))
        else:
            new_kwargs[key] = value
    return new_kwargs

setup_model_from_kwargs

setup_model_from_kwargs(kwargs)

Loops and setup the kwargs of the model

PARAMETER DESCRIPTION
kwargs

TYPE: Any

Source code in edgy/core/db/models/base.py
92
93
94
95
96
97
98
99
def setup_model_from_kwargs(self, kwargs: Any) -> Any:
    """
    Loops and setup the kwargs of the model
    """

    return {
        k: v for k, v in kwargs.items() if k in self.meta.fields_mapping or k in self.meta.model_references
    }

get_columns_for_name

get_columns_for_name(name)
PARAMETER DESCRIPTION
name

TYPE: str

Source code in edgy/core/db/models/base.py
174
175
176
177
178
179
180
181
182
def get_columns_for_name(self, name: str) -> Sequence["sqlalchemy.Column"]:
    table = self.table
    meta = self.meta
    if name in meta.field_to_columns:
        return meta.field_to_columns[name]
    elif name in table.columns:
        return (table.columns[name],)
    else:
        return cast(Sequence["sqlalchemy.Column"], _empty)

identifying_clauses

identifying_clauses()
Source code in edgy/core/db/models/base.py
184
185
186
187
188
189
190
191
def identifying_clauses(self) -> Iterable[Any]:
    for field_name in self.identifying_db_fields:
        field = self.meta.fields_mapping.get(field_name)
        if field is not None:
            for column, value in field.clean(field_name, self.__dict__[field_name]).items():
                yield getattr(self.table.columns, column) == value
        else:
            yield getattr(self.table.columns, field_name) == self.__dict__[field_name]

generate_proxy_model classmethod

generate_proxy_model()

Generates a proxy model for each model. This proxy model is a simple shallow copy of the original model being generated.

Source code in edgy/core/db/models/base.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
@classmethod
def generate_proxy_model(cls) -> Type["Model"]:
    """
    Generates a proxy model for each model. This proxy model is a simple
    shallow copy of the original model being generated.
    """
    if cls.__proxy_model__:
        return cls.__proxy_model__

    fields = {key: copy.copy(field) for key, field in cls.meta.fields_mapping.items()}
    proxy_model = ProxyModel(
        name=cls.__name__,
        module=cls.__module__,
        metadata=cls.meta,
        definitions=fields,
    )

    proxy_model.build()
    generify_model_fields(proxy_model.model)
    return proxy_model.model

model_dump

model_dump(show_pk=None, **kwargs)

An updated version of the model dump. It can show the pk always and handles the exclude attribute on fields correctly and contains the custom logic for fields with getters

PARAMETER DESCRIPTION
show_pk

TYPE: Union[bool, None] DEFAULT: None

**kwargs

TYPE: Any DEFAULT: {}

Extra Args

show_pk: bool - Enforces showing the primary key in the model_dump.

Source code in edgy/core/db/models/base.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def model_dump(self, show_pk: Union[bool, None] = None, **kwargs: Any) -> Dict[str, Any]:
    """
    An updated version of the model dump.
    It can show the pk always and handles the exclude attribute on fields correctly and
    contains the custom logic for fields with getters

    Extra Args:
        show_pk: bool - Enforces showing the primary key in the model_dump.
    """
    # we want a copy
    exclude: Union[Set[str], Dict[str, Any], None] = kwargs.pop("exclude", None)
    if exclude is None:
        initial_full_field_exclude = _empty
        # must be writable
        exclude = set()
    elif isinstance(exclude, dict):
        initial_full_field_exclude = {k for k, v in exclude.items() if v is True}
        exclude = copy.copy(exclude)
    else:
        initial_full_field_exclude = set(exclude)
        exclude = copy.copy(initial_full_field_exclude)

    if isinstance(exclude, dict):
        exclude["__show_pk__"] = True
        for field_name in self.meta.excluded_fields:
            exclude[field_name] = True
    else:
        exclude.update(self.meta.special_getter_fields)
        exclude.update(self.meta.excluded_fields)
        exclude.add("__show_pk__")
    include: Union[Set[str], Dict[str, Any], None] = kwargs.pop("include", None)
    mode: Union[Literal["json", "python"], str] = kwargs.pop("mode", "python")

    should_show_pk = show_pk or self.__show_pk__
    model = dict(super().model_dump(exclude=exclude, include=include, mode=mode, **kwargs))
    # Workaround for metafields, computed field logic introduces many problems
    # so reimplement the logic here
    for field_name in self.meta.special_getter_fields:
        if field_name == "pk":
            continue
        if not should_show_pk or field_name not in self.pknames:
            if field_name in initial_full_field_exclude:
                continue
            if include is not None and field_name not in include:
                continue
            if getattr(field_name, "exclude", False):
                continue
        field: BaseField = self.meta.fields_mapping[field_name]
        try:
            retval = field.__get__(self, self.__class__)
        except AttributeError:
            continue
        sub_include = None
        if isinstance(include, dict):
            sub_include = include.get(field_name, None)
            if sub_include is True:
                sub_include = None
        sub_exclude = None
        if isinstance(exclude, dict):
            sub_exclude = exclude.get(field_name, None)
            if sub_exclude is True:
                sub_exclude = None
        if isinstance(retval, BaseModel):
            retval = retval.model_dump(include=sub_include, exclude=sub_exclude, mode=mode, **kwargs)
        else:
            assert (
                sub_include is None
            ), "sub include filters for CompositeField specified, but no Pydantic model is set"
            assert (
                sub_exclude is None
            ), "sub exclude filters for CompositeField specified, but no Pydantic model is set"
            if mode == "json" and not getattr(field, "unsafe_json_serialization", False):
                # skip field if it isn't a BaseModel and the mode is json and unsafe_json_serialization is not set
                # currently unsafe_json_serialization exists only on ConcreteCompositeFields
                continue
        alias: str = field_name
        if getattr(field, "serialization_alias", None):
            alias = cast(str, field.serialization_alias)
        elif getattr(field, "alias", None):
            alias = field.alias
        model[alias] = retval
    # tenants cause excluded fields to reappear
    # TODO: find a better bugfix
    for excluded_field in self.meta.excluded_fields:
        model.pop(excluded_field, None)
    return model

build classmethod

build(schema=None)

The inspect is done in an async manner and reflects the objects from the database.

PARAMETER DESCRIPTION
schema

TYPE: Optional[str] DEFAULT: None

Source code in edgy/core/db/models/base.py
455
456
457
458
459
460
461
462
463
464
465
466
467
@classmethod
def build(cls, schema: Optional[str] = None) -> Any:
    """
    The inspect is done in an async manner and reflects the objects from the database.
    """
    registry = cls.meta.registry
    assert registry is not None, "registry is not set"
    metadata: sqlalchemy.MetaData = registry._metadata
    schema_name = schema or registry.db_schema
    metadata.schema = schema_name

    tablename: str = cast("str", cls.meta.tablename)
    return run_sync(cls.reflect(registry, tablename, metadata, schema_name))

_get_unique_constraints classmethod

_get_unique_constraints(columns)

Returns the unique constraints for the model.

The columns must be a a list, tuple of strings or a UniqueConstraint object.

:return: Model UniqueConstraint.

PARAMETER DESCRIPTION
columns

TYPE: Union[Sequence, str, UniqueConstraint]

Source code in edgy/core/db/models/base.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
@classmethod
def _get_unique_constraints(
    cls, columns: Union[Sequence, str, sqlalchemy.UniqueConstraint]
) -> Optional[sqlalchemy.UniqueConstraint]:
    """
    Returns the unique constraints for the model.

    The columns must be a a list, tuple of strings or a UniqueConstraint object.

    :return: Model UniqueConstraint.
    """
    if isinstance(columns, str):
        return sqlalchemy.UniqueConstraint(columns)
    elif isinstance(columns, UniqueConstraint):
        return sqlalchemy.UniqueConstraint(*columns.fields, name=columns.name)
    return sqlalchemy.UniqueConstraint(*columns)

_get_indexes classmethod

_get_indexes(index)

Creates the index based on the Index fields

PARAMETER DESCRIPTION
index

TYPE: Index

Source code in edgy/core/db/models/base.py
360
361
362
363
364
365
@classmethod
def _get_indexes(cls, index: Index) -> Optional[sqlalchemy.Index]:
    """
    Creates the index based on the Index fields
    """
    return sqlalchemy.Index(index.name, *index.fields)  # type: ignore

extract_db_fields

extract_db_fields()

Extracts all the db fields, model references and fields. Related fields are not included because they are disjoint.

Source code in edgy/core/db/models/base.py
367
368
369
370
371
372
373
374
375
def extract_db_fields(self) -> Dict[str, Any]:
    """
    Extracts all the db fields, model references and fields.
    Related fields are not included because they are disjoint.
    """
    fields_mapping = self.meta.fields_mapping
    model_references = self.meta.model_references
    columns = self.__class__.columns
    return {k: v for k, v in self.__dict__.items() if k in fields_mapping or k in columns or k in model_references}

get_instance_name

get_instance_name()

Returns the name of the class in lowercase.

Source code in edgy/core/db/models/base.py
377
378
379
380
381
def get_instance_name(self) -> str:
    """
    Returns the name of the class in lowercase.
    """
    return self.__class__.__name__.lower()

load async

load()
Source code in edgy/core/db/models/model.py
77
78
79
80
81
82
83
84
85
86
87
88
async def load(self) -> None:
    # Build the select expression.

    expression = self.table.select().where(*self.identifying_clauses())

    # Perform the fetch.
    row = await self.database.fetch_one(expression)
    # check if is in system
    if row is None:
        raise ObjectNotFound("row does not exist anymore")
    # Update the instance.
    self.__dict__.update(self.transform_input(dict(row._mapping), phase="load"))

__setattr__

__setattr__(key, value)
PARAMETER DESCRIPTION
key

TYPE: str

value

TYPE: Any

Source code in edgy/core/db/models/base.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def __setattr__(self, key: str, value: Any) -> None:
    fields_mapping = self.meta.fields_mapping
    field = fields_mapping.get(key, None)
    if field is not None:
        if hasattr(field, "__set__"):
            # not recommended, better to use to_model instead
            # used in related_fields to mask and not to implement to_model
            field.__set__(self, value)
        else:
            for k, v in field.to_model(key, value, phase="set").items():
                # bypass __settr__
                edgy_setattr(self, k, v)
    else:
        # bypass __settr__
        edgy_setattr(self, key, value)

__getattr__

__getattr__(name)

Does following things 1. Initialize managers on access 2. Redirects get accesses to getter fields 3. Run an one off query to populate any foreign key making sure it runs only once per foreign key avoiding multiple database calls.

PARAMETER DESCRIPTION
name

TYPE: str

Source code in edgy/core/db/models/base.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def __getattr__(self, name: str) -> Any:
    """
    Does following things
    1. Initialize managers on access
    2. Redirects get accesses to getter fields
    3. Run an one off query to populate any foreign key making sure
       it runs only once per foreign key avoiding multiple database calls.
    """
    manager = self.meta.managers.get(name)
    if manager is not None:
        if name not in self.__dict__:
            manager = copy.copy(manager)
            manager.instance = self
            self.__dict__[name] = manager
        return self.__dict__[name]

    field = self.meta.fields_mapping.get(name)
    if field is not None and hasattr(field, "__get__"):
        # no need to set an descriptor object
        return field.__get__(self, self.__class__)
    if name not in self.__dict__ and field is not None and name not in self.identifying_db_fields and self.can_load:
        run_sync(self.load())
        return self.__dict__[name]
    return super().__getattr__(name)

__eq__

__eq__(other)
PARAMETER DESCRIPTION
other

TYPE: Any

Source code in edgy/core/db/models/base.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
def __eq__(self, other: Any) -> bool:
    # if self.__class__ != other.__class__:
    #     return False
    # somehow meta gets regenerated, so just compare tablename and registry.
    if self.meta.registry is not other.meta.registry:
        return False
    if self.meta.tablename != other.meta.tablename:
        return False
    self_dict = self._extract_values_from_field(self.extract_db_fields(), is_partial=True)
    other_dict = self._extract_values_from_field(other.extract_db_fields(), is_partial=True)
    key_set = {*self_dict.keys(), *other_dict.keys()}
    for field_name in key_set:
        if self_dict.get(field_name) != other_dict.get(field_name):
            return False
    return True

reflect async classmethod

reflect(registry, tablename, metadata, schema=None)

Reflect a table from the database and return its SQLAlchemy Table object.

This method connects to the database using the provided registry, reflects the table with the given name and metadata, and returns the SQLAlchemy Table object.

PARAMETER DESCRIPTION
registry

TYPE: Registry

tablename

TYPE: str

metadata

TYPE: MetaData

schema

TYPE: Union[str, None] DEFAULT: None

PARAMETER DESCRIPTION
registry

The registry object containing the database engine.

TYPE: Registry

tablename

The name of the table to reflect.

TYPE: str

metadata

The SQLAlchemy MetaData object to associate with the reflected table.

TYPE: MetaData

schema

The schema name where the table is located. Defaults to None.

TYPE: Union[str, None] DEFAULT: None

RETURNS DESCRIPTION
Table

sqlalchemy.Table: The reflected SQLAlchemy Table object.

RAISES DESCRIPTION
ImproperlyConfigured

If there is an error during the reflection process.

Source code in edgy/core/db/models/base.py
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
@classmethod
async def reflect(
    cls,
    registry: "Registry",
    tablename: str,
    metadata: sqlalchemy.MetaData,
    schema: Union[str, None] = None,
) -> sqlalchemy.Table:
    """
    Reflect a table from the database and return its SQLAlchemy Table object.

    This method connects to the database using the provided registry, reflects
    the table with the given name and metadata, and returns the SQLAlchemy
    Table object.

    Parameters:
        registry (Registry): The registry object containing the database engine.
        tablename (str): The name of the table to reflect.
        metadata (sqlalchemy.MetaData): The SQLAlchemy MetaData object to associate with the reflected table.
        schema (Union[str, None], optional): The schema name where the table is located. Defaults to None.

    Returns:
        sqlalchemy.Table: The reflected SQLAlchemy Table object.

    Raises:
        ImproperlyConfigured: If there is an error during the reflection process.
    """

    def execute_reflection(connection: AsyncConnection) -> sqlalchemy.Table:
        """Helper function to create and reflect the table."""
        try:
            return sqlalchemy.Table(tablename, metadata, schema=schema, autoload_with=connection)
        except Exception as e:
            raise e

    try:
        async with registry.engine.begin() as connection:
            table = await connection.run_sync(execute_reflection)
        await registry.engine.dispose()
        return table
    except Exception as e:
        raise ImproperlyConfigured(detail=str(e)) from e

from_sqla_row classmethod

from_sqla_row(row, select_related=None, prefetch_related=None, is_only_fields=False, only_fields=None, is_defer_fields=False, exclude_secrets=False, using_schema=None)

Class method to convert a SQLAlchemy Row result into a EdgyModel row type.

Looping through select_related fields if the query comes from a select_related operation. Validates if exists the select_related and related_field inside the models.

When select_related and related_field exist for the same field being validated, the related field is ignored as it won't override the value already collected from the select_related.

If there is no select_related, then goes through the related field where it should only return the instance of the the ForeignKey with the ID, making it lazy loaded.

:return: Model class.

PARAMETER DESCRIPTION
row

TYPE: Row

select_related

TYPE: Optional[Sequence[Any]] DEFAULT: None

prefetch_related

TYPE: Optional[Sequence[Prefetch]] DEFAULT: None

is_only_fields

TYPE: bool DEFAULT: False

only_fields

TYPE: Sequence[str] DEFAULT: None

is_defer_fields

TYPE: bool DEFAULT: False

exclude_secrets

TYPE: bool DEFAULT: False

using_schema

TYPE: Union[str, None] DEFAULT: None

Source code in edgy/core/db/models/row.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@classmethod
def from_sqla_row(
    cls,
    row: "Row",
    select_related: Optional[Sequence[Any]] = None,
    prefetch_related: Optional[Sequence["Prefetch"]] = None,
    is_only_fields: bool = False,
    only_fields: Sequence[str] = None,
    is_defer_fields: bool = False,
    exclude_secrets: bool = False,
    using_schema: Union[str, None] = None,
) -> Optional[Type["Model"]]:
    """
    Class method to convert a SQLAlchemy Row result into a EdgyModel row type.

    Looping through select_related fields if the query comes from a select_related operation.
    Validates if exists the select_related and related_field inside the models.

    When select_related and related_field exist for the same field being validated, the related
    field is ignored as it won't override the value already collected from the select_related.

    If there is no select_related, then goes through the related field where it **should**
    only return the instance of the the ForeignKey with the ID, making it lazy loaded.

    :return: Model class.
    """
    item: Dict[str, Any] = {}
    select_related = select_related or []
    prefetch_related = prefetch_related or []
    secret_fields = [name for name, field in cls.fields.items() if field.secret] if exclude_secrets else []

    for related in select_related:
        field_name = related.split("__", 1)[0]
        field = cls.meta.fields_mapping[field_name]
        if isinstance(field, RelationshipField):
            model_class, _, remainder = field.traverse_field(related)
        else:
            raise Exception("invalid field")
        if remainder:
            item[field_name] = model_class.from_sqla_row(
                row,
                select_related=[remainder],
                prefetch_related=prefetch_related,
                exclude_secrets=exclude_secrets,
                using_schema=using_schema,
            )
        else:
            item[field_name] = model_class.from_sqla_row(row, exclude_secrets=exclude_secrets, using_schema=using_schema)
    # Populate the related names
    # Making sure if the model being queried is not inside a select related
    # This way it is not overritten by any value
    for related, foreign_key in cls.meta.foreign_key_fields.items():
        ignore_related: bool = cls.__should_ignore_related_name(related, select_related)
        if ignore_related or related in secret_fields:
            continue

        columns_to_check = foreign_key.get_column_names(related)
        if secret_fields and not columns_to_check.isdisjoint(secret_fields):
            continue

        model_related = foreign_key.target

        # Apply the schema to the model
        model_related = cls.__apply_schema(model_related, using_schema)

        child_item = {}
        for column_name in columns_to_check:
            if column_name not in row:
                continue
            elif row[column_name] is not None:  # type: ignore
                child_item[foreign_key.from_fk_field_name(related, column_name)] = row[column_name]  # type: ignore

        # Make sure we generate a temporary reduced model
        # For the related fields. We simply chnage the structure of the model
        # and rebuild it with the new fields.
        item[related] = model_related.proxy_model(**child_item)

    # Check for the only_fields
    if is_only_fields or is_defer_fields:
        mapping_fields = (
            [str(field) for field in only_fields] if is_only_fields else list(row.keys())  # type: ignore
        )

        for column, value in row._mapping.items():
            if column in secret_fields:
                continue
            # Making sure when a table is reflected, maps the right fields of the ReflectModel
            if column not in mapping_fields:
                continue

            if column not in item:
                item[column] = value

        # We need to generify the model fields to make sure we can populate the
        # model without mandatory fields
        model = cast("Type[Model]", cls.proxy_model(**item))

        # Apply the schema to the model
        model = cls.__apply_schema(model, using_schema)

        model = cls.__handle_prefetch_related(row=row, model=model, prefetch_related=prefetch_related)
        return model
    else:
        # Pull out the regular column values.
        for column in cls.table.columns:
            # Making sure when a table is reflected, maps the right fields of the ReflectModel
            if column.name in secret_fields:
                continue
            if column.name not in cls.fields.keys():
                continue
            elif column.name not in item:
                item[column.name] = row[column]

    model = (
        cast("Type[Model]", cls(**item)) if not exclude_secrets else cast("Type[Model]", cls.proxy_model(**item))
    )

    # Apply the schema to the model
    model = cls.__apply_schema(model, using_schema)

    # Handle prefetch related fields.
    model = cls.__handle_prefetch_related(row=row, model=model, prefetch_related=prefetch_related)
    return model

__apply_schema classmethod

__apply_schema(model, schema=None)
PARAMETER DESCRIPTION
model

TYPE: Type[Model]

schema

TYPE: Optional[str] DEFAULT: None

Source code in edgy/core/db/models/row.py
146
147
148
149
150
151
152
@classmethod
def __apply_schema(cls, model: Type["Model"], schema: Optional[str] = None) -> Type["Model"]:
    # Apply the schema to the model
    if schema is not None:
        model.table = model.build(schema)  # type: ignore
        model.proxy_model.table = model.proxy_model.build(schema)  # type: ignore
    return model
__should_ignore_related_name(related_name, select_related)

Validates if it should populate the related field if select related is not considered.

PARAMETER DESCRIPTION
related_name

TYPE: str

select_related

TYPE: Sequence[str]

Source code in edgy/core/db/models/row.py
154
155
156
157
158
159
160
161
162
163
@classmethod
def __should_ignore_related_name(cls, related_name: str, select_related: Sequence[str]) -> bool:
    """
    Validates if it should populate the related field if select related is not considered.
    """
    for related_field in select_related:
        fields = related_field.split("__")
        if related_name in fields:
            return True
    return False
__handle_prefetch_related(row, model, prefetch_related, parent_cls=None, reverse_path='', is_nested=False)

Handles any prefetch related scenario from the model. Loads in advance all the models needed for a specific record

Recursively checks for the related field and validates if there is any conflicting attribute. If there is, a QuerySetError is raised.

PARAMETER DESCRIPTION
row

TYPE: Row

model

TYPE: Type[Model]

prefetch_related

TYPE: Sequence[Prefetch]

parent_cls

TYPE: Optional[Type[Model]] DEFAULT: None

reverse_path

TYPE: str DEFAULT: ''

is_nested

TYPE: bool DEFAULT: False

Source code in edgy/core/db/models/row.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
@classmethod
def __handle_prefetch_related(
    cls,
    row: "Row",
    model: Type["Model"],
    # for advancing
    prefetch_related: Sequence["Prefetch"],
    parent_cls: Optional[Type["Model"]] = None,
    # for going back
    reverse_path: str = "",
    is_nested: bool = False,
) -> Type["Model"]:
    """
    Handles any prefetch related scenario from the model.
    Loads in advance all the models needed for a specific record

    Recursively checks for the related field and validates if there is any conflicting
    attribute. If there is, a `QuerySetError` is raised.
    """
    if not parent_cls:
        parent_cls = model

    for related in prefetch_related:
        if not is_nested:
            # Check for conflicting names
            # If to_attr has the same name of any
            if hasattr(parent_cls, related.to_attr):
                raise QuerySetError(
                    f"Conflicting attribute to_attr='{related.related_name}' with '{related.to_attr}' in {parent_cls.__class__.__name__}"
                )

        if not is_nested:
            reverse_path = ""

        if "__" in related.related_name:
            first_part, remainder = related.related_name.split("__", 1)

            field = cls.meta.fields_mapping[first_part]
            if isinstance(field, RelationshipField):
                model_class, reverse_part, remainder = field.traverse_field(related.related_name)
                if not reverse_part:
                    raise Exception("No reverse relation possible (missing related_name)")
            else:
                raise Exception("invalid field")

            # Build the new nested Prefetch object
            remainder_prefetch = related.__class__(
                related_name=remainder, to_attr=related.to_attr, queryset=related.queryset
            )
            if reverse_path:
                reverse_path = f"{reverse_part}__{reverse_path}"
            else:
                reverse_path = reverse_part

            # Recursively continue the process of handling the
            # new prefetch
            model_class.__handle_prefetch_related(
                row,
                model,
                prefetch_related=[remainder_prefetch],
                reverse_path=reverse_path,
                parent_cls=model,
                is_nested=True,
            )

        # Check for individual not nested querysets
        elif related.queryset is not None and not is_nested:
            extra = {}
            for pkcol in cls.pkcolumns:
                filter_by_pk = row[pkcol]
                extra[f"{related.related_name}__{pkcol}"] = filter_by_pk
            related.queryset.extra = extra

            # Execute the queryset
            records = asyncio.get_event_loop().run_until_complete(cls.run_query(queryset=related.queryset))
            setattr(model, related.to_attr, records)
        else:
            records = cls.process_nested_prefetch_related(
                row,
                prefetch_related=related,
                reverse_path=reverse_path,
                parent_cls=model,
                queryset=related.queryset,
            )

            setattr(model, related.to_attr, records)
    return model
process_nested_prefetch_related(row, prefetch_related, parent_cls, reverse_path, queryset)

Processes the nested prefetch related names.

PARAMETER DESCRIPTION
row

TYPE: Row

prefetch_related

TYPE: Prefetch

parent_cls

TYPE: Type[Model]

reverse_path

TYPE: str

queryset

TYPE: QuerySet

Source code in edgy/core/db/models/row.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
@classmethod
def process_nested_prefetch_related(
    cls,
    row: "Row",
    prefetch_related: "Prefetch",
    parent_cls: Type["Model"],
    reverse_path: str,
    queryset: "QuerySet",
) -> List[Type["Model"]]:
    """
    Processes the nested prefetch related names.
    """
    # Get the related field
    field = cls.meta.fields_mapping[prefetch_related.related_name]
    if isinstance(field, RelationshipField):
        model_class, reverse_part, remainder = field.traverse_field(prefetch_related.related_name)
        if not reverse_part:
            raise Exception("No backward relation possible (missing related_name)")
    else:
        raise Exception("invalid field")

    if reverse_path:
        reverse_path = f"{reverse_part}__{reverse_path}"
    else:
        reverse_path = reverse_part

    # TODO: related_field.clean would be better
    # fix this later when allowing selecting fields for fireign keys
    # Extract foreign key value
    extra = {}
    for pkcol in parent_cls.pkcolumns:
        filter_by_pk = row[pkcol]
        extra[f"{reverse_path}__{pkcol}"] = filter_by_pk

    records = asyncio.get_event_loop().run_until_complete(cls.run_query(model_class, extra, queryset))
    return records

run_query async classmethod

run_query(model=None, extra=None, queryset=None)

Runs a specific query against a given model with filters.

PARAMETER DESCRIPTION
model

TYPE: Optional[Type[Model]] DEFAULT: None

extra

TYPE: Optional[Dict[str, Any]] DEFAULT: None

queryset

TYPE: Optional[QuerySet] DEFAULT: None

Source code in edgy/core/db/models/row.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
@classmethod
async def run_query(
    cls,
    model: Optional[Type["Model"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    queryset: Optional["QuerySet"] = None,
) -> Union[List[Type["Model"]], Any]:
    """
    Runs a specific query against a given model with filters.
    """

    if not queryset:
        return await model.query.filter(**extra)  # type: ignore

    if extra:
        queryset.extra = extra

    return await queryset

update async

update(**kwargs)

Update operation of the database fields.

PARAMETER DESCRIPTION
**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/model.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
async def update(self, **kwargs: Any) -> Any:
    """
    Update operation of the database fields.
    """
    await self.signals.pre_update.send_async(self.__class__, instance=self)

    # empty updates shouldn't cause an error
    if kwargs:
        kwargs = self._update_auto_now_fields(kwargs, self.fields)
        expression = self.table.update().values(**kwargs).where(*self.identifying_clauses())
        await self.database.execute(expression)
    await self.signals.post_update.send_async(self.__class__, instance=self)

    # Update the model instance.
    for key, value in kwargs.items():
        setattr(self, key, value)

    for field in self.meta.fields_mapping.keys():
        _val = self.__dict__.get(field)
        if isinstance(_val, ManyRelationProtocol):
            _val.instance = self
            await _val.save_related()
    return self

delete async

delete()

Delete operation from the database

Source code in edgy/core/db/models/model.py
68
69
70
71
72
73
74
75
async def delete(self) -> None:
    """Delete operation from the database"""
    await self.signals.pre_delete.send_async(self.__class__, instance=self)

    expression = self.table.delete().where(*self.identifying_clauses())
    await self.database.execute(expression)

    await self.signals.post_delete.send_async(self.__class__, instance=self)

_save async

_save(**kwargs)

Performs the save instruction.

PARAMETER DESCRIPTION
**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/model.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
async def _save(self, **kwargs: Any) -> "Model":
    """
    Performs the save instruction.
    """
    expression = self.table.insert().values(**kwargs)
    autoincrement_value = await self.database.execute(expression)
    transformed_kwargs = self.transform_input(kwargs, phase="post_insert")
    for k, v in transformed_kwargs.items():
        setattr(self, k, v)

    # sqlalchemy supports only one autoincrement column
    if autoincrement_value:
        column = self.table.autoincrement_column
        if column is not None:
            setattr(self, column.key, autoincrement_value)
    for field in self.meta.fields_mapping.keys():
        _val = self.__dict__.get(field)
        if isinstance(_val, ManyRelationProtocol):
            _val.instance = self
            await _val.save_related()
    return self

save_model_references async

save_model_references(model_references, model_ref=None)

If there is any ModelRef declared in the model, it will generate the subsquent model reference records for that same model created.

PARAMETER DESCRIPTION
model_references

TYPE: Any

model_ref

TYPE: Any DEFAULT: None

Source code in edgy/core/db/models/model.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
async def save_model_references(self, model_references: Any, model_ref: Any = None) -> None:
    """
    If there is any ModelRef declared in the model, it will generate the subsquent model
    reference records for that same model created.
    """

    for reference in model_references:
        if isinstance(reference, dict):
            model: Type["Model"] = self.meta.model_references[model_ref].__model__  # type: ignore
        else:
            model: Type["Model"] = reference.__model__  # type: ignore

        if isinstance(model, str):
            model = self.meta.registry.models[model]  # type: ignore

        # If the reference did come in a dict format
        # It is necessary to convert into the original ModelRef.
        if isinstance(reference, dict):
            reference = self.meta.model_references[model_ref](**reference)  # type: ignore

        foreign_key_target_field = None
        for name, foreign_key in model.meta.foreign_key_fields.items():
            if foreign_key.target == self.__class__:
                foreign_key_target_field = name

        if not foreign_key_target_field:
            raise RelationshipNotFound(
                f"There was no relationship found between '{model.__class__.__name__}' and {self.__class__.__name__}"
            )

        data = reference.model_dump(exclude={"__model__"})
        data[foreign_key_target_field] = self
        await model.query.create(**data)

update_model_references

update_model_references(**kwargs)
PARAMETER DESCRIPTION
**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/model.py
146
147
148
149
150
151
152
153
154
155
156
157
158
def update_model_references(self, **kwargs: Any) -> Any:
    model_refs_set: Set[str] = set()
    model_references: Dict[str, Any] = {}

    for name, value in kwargs.items():
        if name in self.meta.model_references:
            model_references[name] = value
            model_refs_set.add(name)

    for value in model_refs_set:
        kwargs.pop(value)

    return kwargs, model_references

save async

save(force_save=False, values=None, **kwargs)

Performs a save of a given model instance. When creating a user it will make sure it can update existing or create a new one.

PARAMETER DESCRIPTION
force_save

TYPE: bool DEFAULT: False

values

TYPE: Dict[str, Any] DEFAULT: None

**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/model.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
async def save(
    self,
    force_save: bool = False,
    values: Dict[str, Any] = None,
    **kwargs: Any,
) -> Union["Model", Any]:
    """
    Performs a save of a given model instance.
    When creating a user it will make sure it can update existing or
    create a new one.
    """
    await self.signals.pre_save.send_async(self.__class__, instance=self)

    extracted_fields = self.extract_db_fields()

    for pkcolumn in self.__class__.pkcolumns:
        # should trigger load in case of identifying_db_fields
        if getattr(self, pkcolumn, None) is None and self.table.columns[pkcolumn].autoincrement:
            extracted_fields.pop(pkcolumn, None)
            force_save = True

    if force_save:
        if values:
            extracted_fields.update(values)
        # force save must ensure a complete mapping
        validated_values = self._extract_values_from_field(
            extracted_values=extracted_fields, is_partial=False
        )
        kwargs = self._update_auto_now_fields(values=validated_values, fields=self.fields)
        kwargs, model_references = self.update_model_references(**kwargs)
        await self._save(**kwargs)
    else:
        # Broadcast the initial update details
        # Making sure it only updates the fields that should be updated
        # and excludes the fields aith `auto_now` as true
        validated_values = self._extract_values_from_field(
            extracted_values=extracted_fields if values is None else values,
            is_update=True,
            is_partial=values is not None,
        )
        kwargs, model_references = self.update_model_references(**validated_values)
        update_model = {k: v for k, v in validated_values.items() if k in kwargs}

        await self.signals.pre_update.send_async(self.__class__, instance=self, kwargs=update_model)
        await self.update(**update_model)

        # Broadcast the update complete
        await self.signals.post_update.send_async(self.__class__, instance=self)

    # Save the model references
    if model_references:
        for model_ref, references in model_references.items():
            await self.save_model_references(references or [], model_ref=model_ref)

    # Refresh the results
    if any(field.server_default is not None for name, field in self.fields.items() if name not in extracted_fields):
        await self.load()

    await self.signals.post_save.send_async(self.__class__, instance=self)
    return self