Skip to content

Schema class

edgy.core.connection.schemas.Schema

Schema(registry)

All the schema operations object.

All the operations regarding a schema are placed in one object

PARAMETER DESCRIPTION
registry

TYPE: Registry

Source code in edgy/core/connection/schemas.py
24
25
def __init__(self, registry: "Registry") -> None:
    self.registry = registry

_default_schema instance-attribute

_default_schema

registry instance-attribute

registry = registry

database property

database

get_default_schema

get_default_schema()

Returns the default schema which is usually None

Source code in edgy/core/connection/schemas.py
31
32
33
34
35
36
37
def get_default_schema(self) -> Optional[str]:
    """
    Returns the default schema which is usually None
    """
    if not hasattr(self, "_default_schema"):
        self._default_schema = self.database.url.sqla_url.get_dialect(True).default_schema_name
    return self._default_schema

activate_schema_path async

activate_schema_path(database, schema, is_shared=True)
PARAMETER DESCRIPTION
database

TYPE: Database

schema

TYPE: str

is_shared

TYPE: bool DEFAULT: True

Source code in edgy/core/connection/schemas.py
39
40
41
42
43
44
45
46
47
48
49
async def activate_schema_path(
    self, database: Database, schema: str, is_shared: bool = True
) -> None:
    # INSECURE, but not used
    path = (
        f"SET search_path TO {schema}, shared;"
        if is_shared
        else f"SET search_path TO {schema};"
    )
    expression = sqlalchemy.text(path)
    await database.execute(expression)

create_schema async

create_schema(schema, if_not_exists=False, init_models=False, init_tenant_models=False, update_cache=True, databases=(None))

Creates a model schema if it does not exist.

PARAMETER DESCRIPTION
schema

TYPE: str

if_not_exists

TYPE: bool DEFAULT: False

init_models

TYPE: bool DEFAULT: False

init_tenant_models

TYPE: bool DEFAULT: False

update_cache

TYPE: bool DEFAULT: True

databases

TYPE: Sequence[Union[str, None]] DEFAULT: (None)

Source code in edgy/core/connection/schemas.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
async def create_schema(
    self,
    schema: str,
    if_not_exists: bool = False,
    init_models: bool = False,
    init_tenant_models: bool = False,
    update_cache: bool = True,
    databases: Sequence[Union[str, None]] = (None,),
) -> None:
    """
    Creates a model schema if it does not exist.
    """
    tables: list[sqlalchemy.Table] = []
    if init_models:
        for model_class in self.registry.models.values():
            model_class.table_schema(schema=schema, update_cache=update_cache)
    if init_tenant_models and init_models:
        for model_class in self.registry.tenant_models.values():
            model_class.table_schema(schema=schema, update_cache=update_cache)
    elif init_tenant_models:
        for model_class in self.registry.tenant_models.values():
            tables.append(model_class.table_schema(schema=schema, update_cache=update_cache))

    def execute_create(connection: sqlalchemy.Connection) -> None:
        try:
            connection.execute(
                sqlalchemy.schema.CreateSchema(name=schema, if_not_exists=if_not_exists)  # type: ignore
            )
        except ProgrammingError as e:
            raise SchemaError(detail=e.orig.args[0]) from e
        for table in tables:
            table.create(connection, checkfirst=if_not_exists)
        if init_models:
            self.registry.metadata.create_all(connection, checkfirst=if_not_exists)

    ops = []
    for database in databases:
        db = self.registry.database if database is None else self.registry.extra[database]
        # don't warn here about inperformance
        async with db as db:
            with db.force_rollback(False):
                ops.append(db.run_sync(execute_create))
    await asyncio.gather(*ops)

drop_schema async

drop_schema(schema, cascade=False, if_exists=False, databases=(None))

Drops an existing model schema.

PARAMETER DESCRIPTION
schema

TYPE: str

cascade

TYPE: bool DEFAULT: False

if_exists

TYPE: bool DEFAULT: False

databases

TYPE: Sequence[Union[str, None]] DEFAULT: (None)

Source code in edgy/core/connection/schemas.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
async def drop_schema(
    self,
    schema: str,
    cascade: bool = False,
    if_exists: bool = False,
    databases: Sequence[Union[str, None]] = (None,),
) -> None:
    """
    Drops an existing model schema.
    """

    def execute_drop(connection: sqlalchemy.Connection) -> None:
        try:
            connection.execute(
                sqlalchemy.schema.DropSchema(name=schema, cascade=cascade, if_exists=if_exists)  # type: ignore
            )
        except DBAPIError as e:
            raise SchemaError(detail=e.orig.args[0]) from e

    ops = []

    for database in databases:
        db = self.registry.database if database is None else self.registry.extra[database]
        # don't warn here about inperformance
        async with db as db:
            with db.force_rollback(False):
                ops.append(db.run_sync(execute_drop))
    await asyncio.gather(*ops)