Skip to content

Model class

edgy.Model

Model(*args, __show_pk__=False, __phase__='init', **kwargs)

Bases: ModelRowMixin, DeclarativeMixin, EdgyBaseModel

Representation of an Edgy Model.

This also means it can generate declarative SQLAlchemy models from anywhere by calling the Model.declarative() function.

Example

import edgyBaseFieldType
from edgy import Database, Registry

database = Database("sqlite:///db.sqlite")
models = Registry(database=database)


class User(edgy.Model):
    '''
    The User model to be created in the database as a table
    If no name is provided the in Meta class, it will generate
    a "users" table for you.
    '''

    id: int = edgy.IntegerField(primary_key=True)
    is_active: bool = edgy.BooleanField(default=False)

    class Meta:
        registry = models
PARAMETER DESCRIPTION
*args

TYPE: Any DEFAULT: ()

__show_pk__

TYPE: bool DEFAULT: False

__phase__

TYPE: str DEFAULT: 'init'

**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/base.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def __init__(
    self, *args: Any, __show_pk__: bool = False, __phase__: str = "init", **kwargs: Any
) -> None:
    # inject in relation fields anonymous ModelRef (without a Field)
    for arg in args:
        if isinstance(arg, ModelRef):
            relation_field = self.meta.fields[arg.__related_name__]
            extra_params = {}
            try:
                # m2m or foreign key
                target_model_class = relation_field.target
            except AttributeError:
                # reverse m2m or foreign key
                target_model_class = relation_field.related_from
            if not relation_field.is_m2m:
                # sometimes the foreign key is required, so set it already
                extra_params[relation_field.foreign_key.name] = self
            model = target_model_class(
                **arg.model_dump(exclude={"__related_name__"}),
                **extra_params,
            )
            existing: Any = kwargs.get(arg.__related_name__)
            if isinstance(existing, Sequence):
                existing = [*existing, model]
            elif existing is None:
                existing = [model]
            else:
                existing = [existing, model]
            kwargs[arg.__related_name__] = existing

    kwargs = self.transform_input(kwargs, phase=__phase__, instance=self)
    super().__init__(**kwargs)
    # restrict to fields
    # TODO: maybe replaceable by direct setting kwargs but doesn't work yet
    self.__dict__ = self.setup_model_from_kwargs(kwargs)
    self.__show_pk__ = __show_pk__
    # always set them in __dict__ to prevent __getattr__ loop
    self._loaded_or_deleted = False

columns class-attribute

columns

database property writable

database

table property writable

table

pkcolumns property

pkcolumns

pknames property

pknames

query class-attribute

query = Manager()
query_related = RedirectManager(redirect_name='query')

meta class-attribute

meta = MetaInfo(None, abstract=True)

_db_schemas class-attribute

_db_schemas

__parent__ class-attribute

__parent__ = None

__is_proxy_model__ class-attribute

__is_proxy_model__ = False

__require_model_based_deletion__ class-attribute

__require_model_based_deletion__ = False

__reflected__ class-attribute

__reflected__ = False

proxy_model cached property

proxy_model

identifying_db_fields cached property

identifying_db_fields

The columns used for loading, can be set per instance defaults to pknames

can_load property

can_load

__proxy_model__ class-attribute

__proxy_model__ = None

__db_model__ class-attribute

__db_model__ = False

__show_pk__ class-attribute instance-attribute

__show_pk__ = __show_pk__

_loaded_or_deleted class-attribute instance-attribute

_loaded_or_deleted = False

__using_schema__ class-attribute instance-attribute

__using_schema__ = Undefined

signals property

signals

fields property

fields

Meta

abstract class-attribute instance-attribute

abstract = True

get_columns_for_name

get_columns_for_name(name)
PARAMETER DESCRIPTION
name

TYPE: str

Source code in edgy/core/db/models/base.py
290
291
292
293
294
295
296
297
298
def get_columns_for_name(self, name: str) -> Sequence["sqlalchemy.Column"]:
    table = self.table
    meta = self.meta
    if name in meta.field_to_columns:
        return meta.field_to_columns[name]
    elif name in table.columns:
        return (table.columns[name],)
    else:
        return cast(Sequence["sqlalchemy.Column"], _empty)

identifying_clauses

identifying_clauses()
Source code in edgy/core/db/models/base.py
300
301
302
303
304
305
306
307
308
309
310
311
def identifying_clauses(self) -> list[Any]:
    clauses: list[Any] = []
    for field_name in self.identifying_db_fields:
        field = self.meta.fields.get(field_name)
        if field is not None:
            for column, value in field.clean(field_name, self.__dict__[field_name]).items():
                clauses.append(getattr(self.table.columns, column) == value)
        else:
            clauses.append(
                getattr(self.table.columns, field_name) == self.__dict__[field_name]
            )
    return clauses

load_recursive async

load_recursive(only_needed=False, only_needed_nest=False, _seen=None)
PARAMETER DESCRIPTION
only_needed

TYPE: bool DEFAULT: False

only_needed_nest

TYPE: bool DEFAULT: False

_seen

TYPE: Optional[set[Any]] DEFAULT: None

Source code in edgy/core/db/models/base.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
async def load_recursive(
    self,
    only_needed: bool = False,
    only_needed_nest: bool = False,
    _seen: Optional[set[Any]] = None,
) -> None:
    if _seen is None:
        _seen = {self.create_model_key()}
    else:
        model_key = self.create_model_key()
        if model_key in _seen:
            return
        else:
            _seen.add(model_key)
    _loaded_or_deleted = self._loaded_or_deleted
    if self.can_load:
        await self.load(only_needed)
    if only_needed_nest and _loaded_or_deleted:
        return
    for field_name in self.meta.foreign_key_fields:
        value = getattr(self, field_name, None)
        if value is not None:
            # if a subinstance is fully loaded stop
            await value.load_recursive(
                only_needed=only_needed, only_needed_nest=True, _seen=_seen
            )

model_dump

model_dump(show_pk=None, **kwargs)

An updated version of the model dump. It can show the pk always and handles the exclude attribute on fields correctly and contains the custom logic for fields with getters

PARAMETER DESCRIPTION
show_pk

TYPE: Union[bool, None] DEFAULT: None

**kwargs

TYPE: Any DEFAULT: {}

Extra Args

show_pk: bool - Enforces showing the primary key in the model_dump.

Source code in edgy/core/db/models/base.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def model_dump(self, show_pk: Union[bool, None] = None, **kwargs: Any) -> dict[str, Any]:
    """
    An updated version of the model dump.
    It can show the pk always and handles the exclude attribute on fields correctly and
    contains the custom logic for fields with getters

    Extra Args:
        show_pk: bool - Enforces showing the primary key in the model_dump.
    """
    # we want a copy
    exclude: Union[set[str], dict[str, Any], None] = kwargs.pop("exclude", None)
    if exclude is None:
        initial_full_field_exclude = _empty
        # must be writable
        exclude = set()
    elif isinstance(exclude, dict):
        initial_full_field_exclude = {k for k, v in exclude.items() if v is True}
        exclude = copy.copy(exclude)
    else:
        initial_full_field_exclude = set(exclude)
        exclude = copy.copy(initial_full_field_exclude)

    if isinstance(exclude, dict):
        # exclude __show_pk__ attribute from showing up
        exclude["__show_pk__"] = True
        for field_name in self.meta.excluded_fields:
            exclude[field_name] = True
    else:
        exclude.update(self.meta.special_getter_fields)
        exclude.update(self.meta.excluded_fields)
        # exclude __show_pk__ attribute from showing up
        exclude.add("__show_pk__")
    include: Union[set[str], dict[str, Any], None] = kwargs.pop("include", None)
    mode: Union[Literal["json", "python"], str] = kwargs.pop("mode", "python")

    should_show_pk = show_pk or self.__show_pk__
    model = dict(super().model_dump(exclude=exclude, include=include, mode=mode, **kwargs))
    # Workaround for metafields, computed field logic introduces many problems
    # so reimplement the logic here
    for field_name in self.meta.special_getter_fields:
        if field_name == "pk":
            continue
        if not should_show_pk or field_name not in self.pknames:
            if field_name in initial_full_field_exclude:
                continue
            if include is not None and field_name not in include:
                continue
            if getattr(field_name, "exclude", False):
                continue
        field: BaseFieldType = self.meta.fields[field_name]
        try:
            retval = field.__get__(self, self.__class__)
        except AttributeError:
            continue
        sub_include = None
        if isinstance(include, dict):
            sub_include = include.get(field_name, None)
            if sub_include is True:
                sub_include = None
        sub_exclude = None
        if isinstance(exclude, dict):
            sub_exclude = exclude.get(field_name, None)
            if sub_exclude is True:
                sub_exclude = None
        if isinstance(retval, BaseModel):
            retval = retval.model_dump(
                include=sub_include, exclude=sub_exclude, mode=mode, **kwargs
            )
        else:
            assert (
                sub_include is None
            ), "sub include filters for CompositeField specified, but no Pydantic model is set"
            assert (
                sub_exclude is None
            ), "sub exclude filters for CompositeField specified, but no Pydantic model is set"
            if mode == "json" and not getattr(field, "unsafe_json_serialization", False):
                # skip field if it isn't a BaseModel and the mode is json and unsafe_json_serialization is not set
                # currently unsafe_json_serialization exists only on CompositeFields
                continue
        alias: str = field_name
        if getattr(field, "serialization_alias", None):
            alias = cast(str, field.serialization_alias)
        elif getattr(field, "alias", None):
            alias = field.alias
        model[alias] = retval
    # tenants cause excluded fields to reappear
    # TODO: find a better bugfix
    for excluded_field in self.meta.excluded_fields:
        model.pop(excluded_field, None)
    return model

build classmethod

build(schema=None, metadata=None)

Builds the SQLAlchemy table representation from the loaded fields.

PARAMETER DESCRIPTION
schema

TYPE: Optional[str] DEFAULT: None

metadata

TYPE: Optional[MetaData] DEFAULT: None

Source code in edgy/core/db/models/base.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
@classmethod
def build(
    cls, schema: Optional[str] = None, metadata: Optional[sqlalchemy.MetaData] = None
) -> sqlalchemy.Table:
    """
    Builds the SQLAlchemy table representation from the loaded fields.
    """
    tablename: str = cls.meta.tablename
    registry = cls.meta.registry
    assert registry is not None, "registry is not set"
    if metadata is None:
        metadata = registry.metadata
    schemes: list[str] = []
    if schema:
        schemes.append(schema)
    schemes.append(registry.db_schema or "")

    unique_together = cls.meta.unique_together
    index_constraints = cls.meta.indexes

    columns: list[sqlalchemy.Column] = []
    global_constraints: list[Any] = []
    for name, field in cls.meta.fields.items():
        current_columns = field.get_columns(name)
        columns.extend(current_columns)
        global_constraints.extend(field.get_global_constraints(name, current_columns, schemes))

    # Handle the uniqueness together
    uniques = []
    for unique_index in unique_together or []:
        unique_constraint = cls._get_unique_constraints(unique_index)
        uniques.append(unique_constraint)

    # Handle the indexes
    indexes = []
    for index_c in index_constraints or []:
        index = cls._get_indexes(index_c)
        indexes.append(index)
    return sqlalchemy.Table(
        tablename,
        metadata,
        *columns,
        *uniques,
        *indexes,
        *global_constraints,
        extend_existing=True,
        schema=schema or registry.db_schema,
    )

execute_post_save_hooks async

execute_post_save_hooks(fields, force_insert)
PARAMETER DESCRIPTION
fields

TYPE: Sequence[str]

force_insert

TYPE: bool

Source code in edgy/core/db/models/base.py
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
async def execute_post_save_hooks(self, fields: Sequence[str], force_insert: bool) -> None:
    affected_fields = self.meta.post_save_fields.intersection(fields)
    if affected_fields:
        # don't trigger loads, AttributeErrors are used for skipping fields
        token = MODEL_GETATTR_BEHAVIOR.set("passdown")
        token2 = CURRENT_MODEL_INSTANCE.set(self)
        try:
            for field_name in affected_fields:
                field = self.meta.fields[field_name]
                try:
                    value = getattr(self, field_name)
                except AttributeError:
                    continue
                await field.post_save_callback(value, instance=self, force_insert=force_insert)
        finally:
            MODEL_GETATTR_BEHAVIOR.reset(token)
            CURRENT_MODEL_INSTANCE.reset(token2)

execute_pre_save_hooks async

execute_pre_save_hooks(column_values, original, force_insert)
PARAMETER DESCRIPTION
column_values

TYPE: dict[str, Any]

original

TYPE: dict[str, Any]

force_insert

TYPE: bool

Source code in edgy/core/db/models/base.py
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
async def execute_pre_save_hooks(
    self, column_values: dict[str, Any], original: dict[str, Any], force_insert: bool
) -> dict[str, Any]:
    # also handle defaults
    keys = {*column_values.keys(), *original.keys()}
    affected_fields = self.meta.pre_save_fields.intersection(keys)
    retdict: dict[str, Any] = {}
    if affected_fields:
        # don't trigger loads
        token = MODEL_GETATTR_BEHAVIOR.set("passdown")
        token2 = CURRENT_MODEL_INSTANCE.set(self)
        try:
            for field_name in affected_fields:
                if field_name not in column_values and field_name not in original:
                    continue
                field = self.meta.fields[field_name]
                retdict.update(
                    await field.pre_save_callback(
                        column_values.get(field_name),
                        original.get(field_name),
                        force_insert=force_insert,
                        instance=self,
                    )
                )
        finally:
            MODEL_GETATTR_BEHAVIOR.reset(token)
            CURRENT_MODEL_INSTANCE.reset(token2)
    return retdict

extract_column_values classmethod

extract_column_values(extracted_values, is_update=False, is_partial=False, phase='', instance=None, model_instance=None)
PARAMETER DESCRIPTION
extracted_values

TYPE: dict[str, Any]

is_update

TYPE: bool DEFAULT: False

is_partial

TYPE: bool DEFAULT: False

phase

TYPE: str DEFAULT: ''

instance

TYPE: Optional[Union[BaseModelType, QuerySet]] DEFAULT: None

model_instance

TYPE: Optional[BaseModelType] DEFAULT: None

Source code in edgy/core/db/models/base.py
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
@classmethod
def extract_column_values(
    cls,
    extracted_values: dict[str, Any],
    is_update: bool = False,
    is_partial: bool = False,
    phase: str = "",
    instance: Optional[Union["BaseModelType", "QuerySet"]] = None,
    model_instance: Optional["BaseModelType"] = None,
) -> dict[str, Any]:
    validated: dict[str, Any] = {}
    token = CURRENT_PHASE.set(phase)
    token2 = CURRENT_INSTANCE.set(instance)
    token3 = CURRENT_MODEL_INSTANCE.set(model_instance)
    try:
        # phase 1: transform when required
        if cls.meta.input_modifying_fields:
            extracted_values = {**extracted_values}
            for field_name in cls.meta.input_modifying_fields:
                cls.meta.fields[field_name].modify_input(field_name, extracted_values)
        # phase 2: validate fields and set defaults for readonly
        need_second_pass: list[BaseFieldType] = []
        for field_name, field in cls.meta.fields.items():
            if field.read_only:
                # if read_only, updates are not possible anymore
                if (
                    not is_partial or (field.inject_default_on_partial_update and is_update)
                ) and field.has_default():
                    validated.update(field.get_default_values(field_name, validated))
                continue
            if field_name in extracted_values:
                item = extracted_values[field_name]
                assert field.owner
                for sub_name, value in field.clean(field_name, item).items():
                    if sub_name in validated:
                        raise ValueError(f"value set twice for key: {sub_name}")
                    validated[sub_name] = value
            elif (
                not is_partial or (field.inject_default_on_partial_update and is_update)
            ) and field.has_default():
                # add field without a value to the second pass (in case no value appears)
                # only include fields which have inject_default_on_partial_update set or if not is_partial
                need_second_pass.append(field)

        # phase 3: set defaults for the rest if not partial or inject_default_on_partial_update
        if need_second_pass:
            for field in need_second_pass:
                # check if field appeared e.g. by composite
                # Note: default values are directly passed without validation
                if field.name not in validated:
                    validated.update(field.get_default_values(field.name, validated))
    finally:
        CURRENT_MODEL_INSTANCE.reset(token3)
        CURRENT_INSTANCE.reset(token2)
        CURRENT_PHASE.reset(token)
    return validated

extract_db_fields

extract_db_fields(only=None)

Extracts all the db fields, model references and fields. Related fields are not included because they are disjoint.

PARAMETER DESCRIPTION
only

TYPE: Optional[Sequence[str]] DEFAULT: None

Source code in edgy/core/db/models/types.py
183
184
185
186
187
188
189
190
191
192
193
194
def extract_db_fields(self, only: Optional[Sequence[str]] = None) -> dict[str, Any]:
    """
    Extracts all the db fields, model references and fields.
    Related fields are not included because they are disjoint.
    """
    fields = self.meta.fields
    columns = self.table.columns

    if only is not None:
        return {k: v for k, v in self.__dict__.items() if k in only}

    return {k: v for k, v in self.__dict__.items() if k in fields or hasattr(columns, k)}

get_instance_name

get_instance_name()

Returns the name of the class in lowercase.

Source code in edgy/core/db/models/types.py
196
197
198
199
200
def get_instance_name(self) -> str:
    """
    Returns the name of the class in lowercase.
    """
    return self.__class__.__name__.lower()

create_model_key

create_model_key()

Build a cache key for the model.

Source code in edgy/core/db/models/types.py
202
203
204
205
206
207
208
209
210
def create_model_key(self) -> tuple:
    """
    Build a cache key for the model.
    """
    pk_key_list: list[Any] = [type(self).__name__]
    # there are no columns, only column results
    for attr in self.pkcolumns:
        pk_key_list.append(str(getattr(self, attr)))
    return tuple(pk_key_list)

transform_input classmethod

transform_input(kwargs, phase='', instance=None)

Expand to_models and apply input modifications.

PARAMETER DESCRIPTION
kwargs

TYPE: Any

phase

TYPE: str DEFAULT: ''

instance

TYPE: Optional[BaseModelType] DEFAULT: None

Source code in edgy/core/db/models/base.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@classmethod
def transform_input(
    cls, kwargs: Any, phase: str = "", instance: Optional["BaseModelType"] = None
) -> Any:
    """
    Expand to_models and apply input modifications.
    """

    kwargs = {**kwargs}
    new_kwargs: dict[str, Any] = {}

    fields = cls.meta.fields
    token = CURRENT_INSTANCE.set(instance)
    token2 = CURRENT_MODEL_INSTANCE.set(instance)
    token3 = CURRENT_PHASE.set(phase)
    try:
        # phase 1: transform
        # Note: this is order dependend. There should be no overlap.
        for field_name in cls.meta.input_modifying_fields:
            fields[field_name].modify_input(field_name, kwargs)
        # phase 2: apply to_model
        for key, value in kwargs.items():
            field = fields.get(key, None)
            if field is not None:
                new_kwargs.update(**field.to_model(key, value))
            else:
                new_kwargs[key] = value
    finally:
        CURRENT_PHASE.reset(token3)
        CURRENT_MODEL_INSTANCE.reset(token2)
        CURRENT_INSTANCE.reset(token)
    return new_kwargs

setup_model_from_kwargs

setup_model_from_kwargs(kwargs)

Loops and setup the kwargs of the model

PARAMETER DESCRIPTION
kwargs

TYPE: Any

Source code in edgy/core/db/models/base.py
140
141
142
143
144
145
def setup_model_from_kwargs(self, kwargs: Any) -> Any:
    """
    Loops and setup the kwargs of the model
    """

    return {k: v for k, v in kwargs.items() if k in self.meta.fields}

copy_edgy_model classmethod

copy_edgy_model(registry=None, name='', **kwargs)

Copy the model class and optionally add it to another registry.

PARAMETER DESCRIPTION
registry

TYPE: Optional[Registry] DEFAULT: None

name

TYPE: str DEFAULT: ''

**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/base.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@classmethod
def copy_edgy_model(
    cls, registry: Optional["Registry"] = None, name: str = "", **kwargs: Any
) -> type["Model"]:
    """Copy the model class and optionally add it to another registry."""
    # removes private pydantic stuff, except the prefixed ones
    attrs = {
        key: val
        for key, val in cls.__dict__.items()
        if key not in BaseModel.__dict__ or key.startswith("__")
    }
    attrs.pop("meta", None)
    # managers and fields are gone, we have to readd them with the correct data
    attrs.update(cls.meta.fields)
    attrs.update(cls.meta.managers)
    _copy = cast(
        type["Model"],
        type(cls.__name__, cls.__bases__, attrs, skip_registry=True, **kwargs),
    )
    _copy.meta.model = _copy
    if name:
        _copy.__name__ = name
    if registry is not None:
        _copy.add_to_registry(registry)
    return _copy

_get_unique_constraints classmethod

_get_unique_constraints(columns)

Returns the unique constraints for the model.

The columns must be a a list, tuple of strings or a UniqueConstraint object.

:return: Model UniqueConstraint.

PARAMETER DESCRIPTION
columns

TYPE: Union[Sequence, str, UniqueConstraint]

Source code in edgy/core/db/models/base.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
@classmethod
def _get_unique_constraints(
    cls, columns: Union[Sequence, str, sqlalchemy.UniqueConstraint]
) -> Optional[sqlalchemy.UniqueConstraint]:
    """
    Returns the unique constraints for the model.

    The columns must be a a list, tuple of strings or a UniqueConstraint object.

    :return: Model UniqueConstraint.
    """
    if isinstance(columns, str):
        return sqlalchemy.UniqueConstraint(columns)
    elif isinstance(columns, UniqueConstraint):
        return sqlalchemy.UniqueConstraint(*columns.fields, name=columns.name)
    return sqlalchemy.UniqueConstraint(*columns)

_get_indexes classmethod

_get_indexes(index)

Creates the index based on the Index fields

PARAMETER DESCRIPTION
index

TYPE: Index

Source code in edgy/core/db/models/base.py
470
471
472
473
474
475
@classmethod
def _get_indexes(cls, index: Index) -> Optional[sqlalchemy.Index]:
    """
    Creates the index based on the Index fields
    """
    return sqlalchemy.Index(index.name, *index.fields)

__setattr__

__setattr__(key, value)
PARAMETER DESCRIPTION
key

TYPE: str

value

TYPE: Any

Source code in edgy/core/db/models/base.py
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
def __setattr__(self, key: str, value: Any) -> None:
    fields = self.meta.fields
    field = fields.get(key, None)
    token = CURRENT_INSTANCE.set(self)
    token2 = CURRENT_PHASE.set("set")
    try:
        if field is not None:
            if hasattr(field, "__set__"):
                # not recommended, better to use to_model instead except for kept objects
                # used in related_fields to mask and not to implement to_model
                field.__set__(self, value)
            else:
                for k, v in field.to_model(key, value).items():
                    # bypass __setattr__ method
                    object.__setattr__(self, k, v)
        else:
            # bypass __setattr__ method
            object.__setattr__(self, key, value)
    finally:
        CURRENT_INSTANCE.reset(token)
        CURRENT_PHASE.reset(token2)

_agetattr_helper async

_agetattr_helper(name, getter)
PARAMETER DESCRIPTION
name

TYPE: str

getter

TYPE: Any

Source code in edgy/core/db/models/base.py
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
async def _agetattr_helper(self, name: str, getter: Any) -> Any:
    await self.load()
    if getter is not None:
        token = MODEL_GETATTR_BEHAVIOR.set("coro")
        try:
            result = getter(self, self.__class__)
            if inspect.isawaitable(result):
                result = await result
            return result
        finally:
            MODEL_GETATTR_BEHAVIOR.reset(token)
    try:
        return self.__dict__[name]
    except KeyError:
        raise AttributeError(f"Attribute: {name} not found") from None

__getattr__

__getattr__(name)

Does following things 1. Initialize managers on access 2. Redirects get accesses to getter fields 3. Run an one off query to populate any foreign key making sure it runs only once per foreign key avoiding multiple database calls.

PARAMETER DESCRIPTION
name

TYPE: str

Source code in edgy/core/db/models/base.py
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
def __getattr__(self, name: str) -> Any:
    """
    Does following things
    1. Initialize managers on access
    2. Redirects get accesses to getter fields
    3. Run an one off query to populate any foreign key making sure
       it runs only once per foreign key avoiding multiple database calls.
    """
    behavior = MODEL_GETATTR_BEHAVIOR.get()
    manager = self.meta.managers.get(name)
    if manager is not None:
        if name not in self.__dict__:
            manager = copy.copy(manager)
            manager.instance = self
            self.__dict__[name] = manager
        return self.__dict__[name]

    field = self.meta.fields.get(name)
    getter: Any = None
    if field is not None and hasattr(field, "__get__"):
        getter = field.__get__
        if behavior == "coro" or behavior == "passdown":
            return field.__get__(self, self.__class__)
        else:
            token = MODEL_GETATTR_BEHAVIOR.set("passdown")
            # no need to set an descriptor object
            try:
                return field.__get__(self, self.__class__)
            except AttributeError:
                # forward to load routine
                pass
            finally:
                MODEL_GETATTR_BEHAVIOR.reset(token)
    if (
        name not in self.__dict__
        and behavior != "passdown"
        and not self._loaded_or_deleted
        and field is not None
        and name not in self.identifying_db_fields
        and self.can_load
    ):
        coro = self._agetattr_helper(name, getter)
        if behavior == "coro":
            return coro
        return run_sync(coro)
    return super().__getattr__(name)

__eq__

__eq__(other)
PARAMETER DESCRIPTION
other

TYPE: Any

Source code in edgy/core/db/models/base.py
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
def __eq__(self, other: Any) -> bool:
    # if self.__class__ != other.__class__:
    #     return False
    # somehow meta gets regenerated, so just compare tablename and registry.
    if self.meta.registry is not other.meta.registry:
        return False
    if self.meta.tablename != other.meta.tablename:
        return False
    self_dict = self.extract_column_values(
        self.extract_db_fields(self.pkcolumns),
        is_partial=True,
        phase="compare",
        instance=self,
        model_instance=self,
    )
    other_dict = other.extract_column_values(
        other.extract_db_fields(self.pkcolumns),
        is_partial=True,
        phase="compare",
        instance=other,
        model_instance=other,
    )
    key_set = {*self_dict.keys(), *other_dict.keys()}
    for field_name in key_set:
        if self_dict.get(field_name) != other_dict.get(field_name):
            return False
    return True

declarative classmethod

declarative()
Source code in edgy/core/db/models/mixins/generics.py
12
13
14
@classmethod
def declarative(cls) -> Any:
    return cls.generate_model_declarative()

generate_model_declarative classmethod

generate_model_declarative()

Transforms a core Edgy table into a Declarative model table.

Source code in edgy/core/db/models/mixins/generics.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@classmethod
def generate_model_declarative(cls) -> Any:
    """
    Transforms a core Edgy table into a Declarative model table.
    """
    Base = cls.meta.registry.declarative_base

    # Build the original table
    fields = {"__table__": cls.table}

    # Generate base
    model_table = type(cls.__name__, (Base,), fields)

    # Make sure if there are foreignkeys, builds the relationships
    for column in cls.table.columns:
        if not column.foreign_keys:
            continue

        # Maps the relationships with the foreign keys and related names
        field = cls.meta.fields.get(column.name)
        to = field.to.__name__ if inspect.isclass(field.to) else field.to
        mapped_model: Mapped[to] = relationship(to)  # type: ignore

        # Adds to the current model
        model_table.__mapper__.add_property(f"{column.name}_relation", mapped_model)

    return model_table

from_sqla_row async classmethod

from_sqla_row(row, select_related=None, prefetch_related=None, is_only_fields=False, only_fields=None, is_defer_fields=False, exclude_secrets=False, using_schema=None, database=None, table=None)

Class method to convert a SQLAlchemy Row result into a EdgyModel row type.

Looping through select_related fields if the query comes from a select_related operation. Validates if exists the select_related and related_field inside the models.

When select_related and related_field exist for the same field being validated, the related field is ignored as it won't override the value already collected from the select_related.

If there is no select_related, then goes through the related field where it should only return the instance of the the ForeignKey with the ID, making it lazy loaded.

:return: Model class.

PARAMETER DESCRIPTION
row

TYPE: Row

select_related

TYPE: Optional[Sequence[Any]] DEFAULT: None

prefetch_related

TYPE: Optional[Sequence[Prefetch]] DEFAULT: None

is_only_fields

TYPE: bool DEFAULT: False

only_fields

TYPE: Sequence[str] DEFAULT: None

is_defer_fields

TYPE: bool DEFAULT: False

exclude_secrets

TYPE: bool DEFAULT: False

using_schema

TYPE: Optional[str] DEFAULT: None

database

TYPE: Optional[Database] DEFAULT: None

table

TYPE: Optional[Table] DEFAULT: None

Source code in edgy/core/db/models/mixins/row.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@classmethod
async def from_sqla_row(
    cls: type["Model"],
    row: "Row",
    select_related: Optional[Sequence[Any]] = None,
    prefetch_related: Optional[Sequence["Prefetch"]] = None,
    is_only_fields: bool = False,
    only_fields: Sequence[str] = None,
    is_defer_fields: bool = False,
    exclude_secrets: bool = False,
    using_schema: Optional[str] = None,
    database: Optional["Database"] = None,
    # local only parameter
    table: Optional["Table"] = None,
) -> Optional["Model"]:
    """
    Class method to convert a SQLAlchemy Row result into a EdgyModel row type.

    Looping through select_related fields if the query comes from a select_related operation.
    Validates if exists the select_related and related_field inside the models.

    When select_related and related_field exist for the same field being validated, the related
    field is ignored as it won't override the value already collected from the select_related.

    If there is no select_related, then goes through the related field where it **should**
    only return the instance of the the ForeignKey with the ID, making it lazy loaded.

    :return: Model class.
    """
    item: dict[str, Any] = {}
    select_related = select_related or []
    prefetch_related = prefetch_related or []
    secret_columns: set[str] = set()
    if exclude_secrets:
        for name in cls.meta.secret_fields:
            secret_columns.update(cls.meta.field_to_column_names[name])

    for related in select_related:
        field_name = related.split("__", 1)[0]
        try:
            field = cls.meta.fields[field_name]
        except KeyError:
            raise QuerySetError(
                detail=f'Selected field "{field_name}cast("Model", " does not exist on {cls}.'
            ) from None
        if isinstance(field, RelationshipField):
            model_class, _, remainder = field.traverse_field(related)
        else:
            raise QuerySetError(
                detail=f'Selected field "{field_name}" is not a RelationshipField on {cls}.'
            ) from None
        if remainder:
            # don't pass table, it is only for the main model_class
            item[field_name] = await model_class.from_sqla_row(
                row,
                select_related=[remainder],
                prefetch_related=prefetch_related,
                exclude_secrets=exclude_secrets,
                using_schema=using_schema,
                database=database,
            )
        else:
            # don't pass table, it is only for the main model_class
            item[field_name] = await model_class.from_sqla_row(
                row,
                exclude_secrets=exclude_secrets,
                using_schema=using_schema,
                database=database,
            )
    table_columns = cls.table_schema(using_schema).columns
    # Populate the related names
    # Making sure if the model being queried is not inside a select related
    # This way it is not overritten by any value
    for related in cls.meta.foreign_key_fields:
        foreign_key = cls.meta.fields[related]
        ignore_related: bool = cls.__should_ignore_related_name(related, select_related)
        if ignore_related or related in cls.meta.secret_fields:
            continue
        if related in item:
            continue

        if exclude_secrets and foreign_key.secret:
            continue
        columns_to_check = foreign_key.get_column_names(related)

        model_related = foreign_key.target

        child_item = {}
        for column_name in columns_to_check:
            column = getattr(table_columns, column_name, None)
            if column is not None and column in row._mapping:
                child_item[foreign_key.from_fk_field_name(related, column_name)] = (
                    row._mapping[column]
                )
        # Make sure we generate a temporary reduced model
        # For the related fields. We simply chnage the structure of the model
        # and rebuild it with the new fields.
        proxy_model = model_related.proxy_model(**child_item)
        # don't pass table, it is only for the main model_class
        proxy_database = database if model_related.database is cls.database else None
        proxy_model = apply_instance_extras(
            proxy_model, model_related, using_schema, database=proxy_database
        )
        proxy_model.identifying_db_fields = foreign_key.related_columns

        item[related] = proxy_model

    # Check for the only_fields
    _is_only = set()
    if is_only_fields:
        _is_only = {str(field) for field in (only_fields or row._mapping.keys())}
    # Pull out the regular column values.
    for column in table_columns:
        # Making sure when a table is reflected, maps the right fields of the ReflectModel
        if _is_only and column.key not in _is_only:
            continue
        if column.key in secret_columns:
            continue
        if column.key not in cls.meta.columns_to_field:
            continue
        # set if not of an foreign key with one column
        elif column.key not in item:
            if column in row._mapping:
                item[column.key] = row._mapping[column]
            elif column.name in row._mapping:
                # fallback, sometimes the column is not found
                item[column.key] = row._mapping[column.name]
    model: Model = (
        cls(**item, __phase__="init_db")  # type: ignore
        if not exclude_secrets and not is_defer_fields and not _is_only
        else cls.proxy_model(**item)
    )
    # Apply the schema to the model
    model = apply_instance_extras(model, cls, using_schema, database=database, table=table)

    # Handle prefetch related fields.
    await cls.__handle_prefetch_related(
        row=row, model=model, prefetch_related=prefetch_related
    )
    assert model.pk is not None, model
    return model
__should_ignore_related_name(related_name, select_related)

Validates if it should populate the related field if select related is not considered.

PARAMETER DESCRIPTION
related_name

TYPE: str

select_related

TYPE: Sequence[str]

Source code in edgy/core/db/models/mixins/row.py
165
166
167
168
169
170
171
172
173
174
175
176
@classmethod
def __should_ignore_related_name(
    cls, related_name: str, select_related: Sequence[str]
) -> bool:
    """
    Validates if it should populate the related field if select related is not considered.
    """
    for related_field in select_related:
        fields = related_field.split("__")
        if related_name in fields:
            return True
    return False

create_model_key_from_sqla_row classmethod

create_model_key_from_sqla_row(row)

Build a cache key for the model.

PARAMETER DESCRIPTION
row

TYPE: Row

Source code in edgy/core/db/models/mixins/row.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
@classmethod
def create_model_key_from_sqla_row(
    cls,
    row: "Row",
) -> tuple:
    """
    Build a cache key for the model.
    """
    pk_key_list: list[Any] = [cls.__name__]
    for attr in cls.pkcolumns:
        try:
            pk_key_list.append(str(row._mapping[getattr(cls.table.columns, attr)]))
        except KeyError:
            pk_key_list.append(str(row._mapping[attr]))
    return tuple(pk_key_list)

__set_prefetch async classmethod

__set_prefetch(row, model, related)
PARAMETER DESCRIPTION
row

TYPE: Row

model

TYPE: Model

related

TYPE: Prefetch

Source code in edgy/core/db/models/mixins/row.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
@classmethod
async def __set_prefetch(cls, row: "Row", model: "Model", related: "Prefetch") -> None:
    model_key = ()
    if related._is_finished:
        await related.init_bake(type(model))
        model_key = model.create_model_key()
    if model_key in related._baked_results:
        setattr(model, related.to_attr, related._baked_results[model_key])
    else:
        clauses = []
        for pkcol in cls.pkcolumns:
            clauses.append(getattr(model.table.columns, pkcol) == getattr(row, pkcol))
        queryset = related.queryset
        if related._is_finished:
            assert queryset is not None, "Queryset is not set but _is_finished flag"
        else:
            check_prefetch_collision(model, related)
            crawl_result = crawl_relationship(
                model.__class__, related.related_name, traverse_last=True
            )
            if queryset is None:
                if crawl_result.reverse_path is False:
                    queryset = model.__class__.query.all()
                else:
                    queryset = crawl_result.model_class.query.all()

            if queryset.model_class == model.__class__:
                # queryset is of this model
                queryset = queryset.select_related(related.related_name)
                queryset.embed_parent = (related.related_name, "")
            elif crawl_result.reverse_path is False:
                QuerySetError(
                    detail=(
                        f"Creating a reverse path is not possible, unidirectional fields used."
                        f"You may want to use as queryset a queryset of model class {model!r}."
                    )
                )
            else:
                # queryset is of the target model
                queryset = queryset.select_related(crawl_result.reverse_path)
        setattr(model, related.to_attr, await queryset.filter(*clauses))
__handle_prefetch_related(row, model, prefetch_related)

Handles any prefetch related scenario from the model. Loads in advance all the models needed for a specific record

Recursively checks for the related field and validates if there is any conflicting attribute. If there is, a QuerySetError is raised.

PARAMETER DESCRIPTION
row

TYPE: Row

model

TYPE: Model

prefetch_related

TYPE: Sequence[Prefetch]

Source code in edgy/core/db/models/mixins/row.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
@classmethod
async def __handle_prefetch_related(
    cls,
    row: "Row",
    model: "Model",
    prefetch_related: Sequence["Prefetch"],
) -> None:
    """
    Handles any prefetch related scenario from the model.
    Loads in advance all the models needed for a specific record

    Recursively checks for the related field and validates if there is any conflicting
    attribute. If there is, a `QuerySetError` is raised.
    """

    queries = []

    for related in prefetch_related:
        # Check for conflicting names
        # Check as early as possible
        check_prefetch_collision(model=model, related=related)
        queries.append(cls.__set_prefetch(row=row, model=model, related=related))
    if queries:
        await asyncio.gather(*queries)

generate_proxy_model classmethod

generate_proxy_model()

Generates a proxy model for each model. This proxy model is a simple shallow copy of the original model being generated.

Source code in edgy/core/db/models/model.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@classmethod
def generate_proxy_model(cls) -> type["Model"]:
    """
    Generates a proxy model for each model. This proxy model is a simple
    shallow copy of the original model being generated.
    """
    fields = {key: copy.copy(field) for key, field in cls.meta.fields.items()}

    class MethodHolder(Model):
        pass

    ignore = set(dir(MethodHolder))

    for key in dir(cls):
        if key in ignore or key.startswith("_"):
            continue
        val = inspect.getattr_static(cls, key)
        if inspect.isfunction(val):
            setattr(MethodHolder, key, val)

    proxy_model = ProxyModel(
        name=cls.__name__,
        module=cls.__module__,
        metadata=cls.meta,
        definitions=fields,
        bases=(MethodHolder,),
    )

    proxy_model.build()
    generify_model_fields(cast(type[EdgyBaseModel], proxy_model.model))
    return cast(type[Model], proxy_model.model)

_update async

_update(**kwargs)

Update operation of the database fields.

PARAMETER DESCRIPTION
**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/model.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
async def _update(self, **kwargs: Any) -> Any:
    """
    Update operation of the database fields.
    """
    await self.meta.signals.pre_update.send_async(self.__class__, instance=self)
    column_values = self.extract_column_values(
        extracted_values=kwargs,
        is_partial=True,
        is_update=True,
        phase="prepare_update",
        instance=self,
        model_instance=self,
    )
    # empty updates shouldn't cause an error. E.g. only model references are updated
    clauses = self.identifying_clauses()
    token = CURRENT_INSTANCE.set(self)
    try:
        if column_values and clauses:
            check_db_connection(self.database)
            async with self.database as database, database.transaction():
                # can update column_values
                column_values.update(
                    await self.execute_pre_save_hooks(
                        column_values, kwargs, force_insert=False
                    )
                )
                expression = self.table.update().values(**column_values).where(*clauses)
                await database.execute(expression)

            # Update the model instance.
            new_kwargs = self.transform_input(
                column_values, phase="post_update", instance=self
            )
            for k, v in new_kwargs.items():
                setattr(self, k, v)

        # updates aren't required to change the db, they can also just affect the meta fields
        await self.execute_post_save_hooks(
            cast(Sequence[str], kwargs.keys()), force_insert=False
        )

    finally:
        CURRENT_INSTANCE.reset(token)
    if column_values or kwargs:
        # Ensure on access refresh the results is active
        self._loaded_or_deleted = False
    await self.meta.signals.post_update.send_async(self.__class__, instance=self)

update async

update(**kwargs)
PARAMETER DESCRIPTION
**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/model.py
138
139
140
141
142
143
144
async def update(self, **kwargs: Any) -> "Model":
    token = EXPLICIT_SPECIFIED_VALUES.set(set(kwargs.keys()))
    try:
        await self._update(**kwargs)
    finally:
        EXPLICIT_SPECIFIED_VALUES.reset(token)
    return self

delete async

delete(skip_post_delete_hooks=False, remove_referenced_call=False)

Delete operation from the database

PARAMETER DESCRIPTION
skip_post_delete_hooks

TYPE: bool DEFAULT: False

remove_referenced_call

TYPE: bool DEFAULT: False

Source code in edgy/core/db/models/model.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
async def delete(
    self, skip_post_delete_hooks: bool = False, remove_referenced_call: bool = False
) -> None:
    """Delete operation from the database"""
    await self.meta.signals.pre_delete.send_async(self.__class__, instance=self)
    # get values before deleting
    field_values: dict[str, Any] = {}
    if not skip_post_delete_hooks and self.meta.post_delete_fields:
        token = MODEL_GETATTR_BEHAVIOR.set("coro")
        try:
            for field_name in self.meta.post_delete_fields:
                try:
                    field_value = getattr(self, field_name)
                except AttributeError:
                    continue
                if inspect.isawaitable(field_value):
                    try:
                        field_value = await field_value
                    except AttributeError:
                        continue
                field_values[field_name] = field_value
        finally:
            MODEL_GETATTR_BEHAVIOR.reset(token)
    clauses = self.identifying_clauses()
    if clauses:
        expression = self.table.delete().where(*clauses)
        check_db_connection(self.database)
        async with self.database as database:
            await database.execute(expression)
    # we cannot load anymore
    self._loaded_or_deleted = True
    # now cleanup with the saved values
    for field_name, value in field_values.items():
        field = self.meta.fields[field_name]
        await field.post_delete_callback(value, instance=self)

    await self.meta.signals.post_delete.send_async(self.__class__, instance=self)

load async

load(only_needed=False)
PARAMETER DESCRIPTION
only_needed

TYPE: bool DEFAULT: False

Source code in edgy/core/db/models/model.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
async def load(self, only_needed: bool = False) -> None:
    if only_needed and self._loaded_or_deleted:
        return
    row = None
    clauses = self.identifying_clauses()
    if clauses:
        # Build the select expression.
        expression = self.table.select().where(*clauses)

        # Perform the fetch.
        check_db_connection(self.database)
        async with self.database as database:
            row = await database.fetch_one(expression)
    # check if is in system
    if row is None:
        raise ObjectNotFound("row does not exist anymore")
    # Update the instance.
    self.__dict__.update(self.transform_input(dict(row._mapping), phase="load", instance=self))
    self._loaded_or_deleted = True

_insert async

_insert(**kwargs)

Performs the save instruction.

PARAMETER DESCRIPTION
**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/model.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
async def _insert(self, **kwargs: Any) -> "Model":
    """
    Performs the save instruction.
    """
    column_values: dict[str, Any] = self.extract_column_values(
        extracted_values=kwargs,
        is_partial=False,
        is_update=False,
        phase="prepare_insert",
        instance=self,
        model_instance=self,
    )
    check_db_connection(self.database)
    token = CURRENT_INSTANCE.set(self)
    try:
        async with self.database as database, database.transaction():
            # can update column_values
            column_values.update(
                await self.execute_pre_save_hooks(column_values, kwargs, force_insert=True)
            )
            expression = self.table.insert().values(**column_values)
            autoincrement_value = await database.execute(expression)
        # sqlalchemy supports only one autoincrement column
        if autoincrement_value:
            column = self.table.autoincrement_column
            if column is not None and hasattr(autoincrement_value, "_mapping"):
                autoincrement_value = autoincrement_value._mapping[column.key]
            # can be explicit set, which causes an invalid value returned
            if column is not None and column.key not in column_values:
                column_values[column.key] = autoincrement_value

        new_kwargs = self.transform_input(column_values, phase="post_insert", instance=self)
        for k, v in new_kwargs.items():
            setattr(self, k, v)

        if self.meta.post_save_fields:
            await self.execute_post_save_hooks(
                cast(Sequence[str], kwargs.keys()), force_insert=True
            )
    finally:
        CURRENT_INSTANCE.reset(token)
    # Ensure on access refresh the results is active
    self._loaded_or_deleted = False

    return self

save async

save(force_insert=False, values=None, force_save=None)

Performs a save of a given model instance. When creating a user it will make sure it can update existing or create a new one.

PARAMETER DESCRIPTION
force_insert

TYPE: bool DEFAULT: False

values

TYPE: Union[dict[str, Any], set[str], None] DEFAULT: None

force_save

TYPE: Optional[bool] DEFAULT: None

Source code in edgy/core/db/models/model.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
async def save(
    self,
    force_insert: bool = False,
    values: Union[dict[str, Any], set[str], None] = None,
    force_save: Optional[bool] = None,
) -> "Model":
    """
    Performs a save of a given model instance.
    When creating a user it will make sure it can update existing or
    create a new one.
    """
    if force_save is not None:
        warnings.warn(
            "'force_save' is deprecated in favor of 'force_insert'",
            DeprecationWarning,
            stacklevel=2,
        )
        force_insert = force_save

    await self.meta.signals.pre_save.send_async(self.__class__, instance=self)

    extracted_fields = self.extract_db_fields()
    if values is None:
        explicit_values: set[str] = set()
    elif isinstance(values, set):
        # special mode for marking values as explicit values
        explicit_values = set(values)
        values = None
    else:
        explicit_values = set(values.keys())

    token = MODEL_GETATTR_BEHAVIOR.set("coro")
    try:
        for pkcolumn in self.__class__.pkcolumns:
            # should trigger load in case of identifying_db_fields
            value = getattr(self, pkcolumn, None)
            if inspect.isawaitable(value):
                value = await value
            if value is None and self.table.columns[pkcolumn].autoincrement:
                extracted_fields.pop(pkcolumn, None)
                force_insert = True
            field = self.meta.fields.get(pkcolumn)
            # this is an IntegerField with primary_key set
            if field is not None and getattr(field, "increment_on_save", 0) != 0:
                # we create a new revision.
                force_insert = True
                # Note: we definitely want this because it is easy for forget a force_insert
    finally:
        MODEL_GETATTR_BEHAVIOR.reset(token)

    token2 = EXPLICIT_SPECIFIED_VALUES.set(explicit_values)
    try:
        if force_insert:
            if values:
                extracted_fields.update(values)
            # force save must ensure a complete mapping
            await self._insert(**extracted_fields)
        else:
            await self._update(**(extracted_fields if values is None else values))
    finally:
        EXPLICIT_SPECIFIED_VALUES.reset(token2)
    await self.meta.signals.post_save.send_async(self.__class__, instance=self)
    return self

transaction

transaction(*, force_rollback=False, **kwargs)

Return database transaction for the assigned database

PARAMETER DESCRIPTION
force_rollback

TYPE: bool DEFAULT: False

**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/model.py
314
315
316
def transaction(self, *, force_rollback: bool = False, **kwargs: Any) -> "Transaction":
    """Return database transaction for the assigned database"""
    return self.database.transaction(force_rollback=force_rollback, **kwargs)