Skip to content

Schema class

edgy.core.connection.schemas.Schema

Schema(registry)

Manages all schema-related operations within the Edgy framework.

This class encapsulates functionalities for creating, dropping, and introspecting database schemas, ensuring proper model integration within multi-tenant or schema-isolated environments.

Initializes the Schema manager with a given registry.

PARAMETER DESCRIPTION
registry

The Edgy registry instance, providing access to models, databases, and other core components.

TYPE: Registry

Source code in edgy/core/connection/schemas.py
28
29
30
31
32
33
34
35
36
def __init__(self, registry: "Registry") -> None:
    """
    Initializes the Schema manager with a given registry.

    Args:
        registry: The Edgy registry instance, providing access to
                  models, databases, and other core components.
    """
    self.registry = registry

_default_schema instance-attribute

_default_schema

registry instance-attribute

registry = registry

database property

database

Provides direct access to the default database configured in the registry.

RETURNS DESCRIPTION
Database

The default database instance from the registry.

get_default_schema

get_default_schema()

Retrieves the default schema name from the underlying database dialect.

This method caches the default schema name after its first retrieval to optimize subsequent calls.

RETURNS DESCRIPTION
str | None

The name of the default schema, or None if not applicable or found.

Source code in edgy/core/connection/schemas.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def get_default_schema(self) -> str | None:
    """
    Retrieves the default schema name from the underlying database dialect.

    This method caches the default schema name after its first retrieval
    to optimize subsequent calls.

    Returns:
        The name of the default schema, or None if not applicable or found.
    """
    # Check if the _default_schema attribute has already been set.
    if not hasattr(self, "_default_schema"):
        # If not set, retrieve the default schema name from the database URL's
        # SQLAlchemy dialect and store it.
        self._default_schema = self.database.url.sqla_url.get_dialect(True).default_schema_name
    # Return the cached default schema name.
    return self._default_schema

activate_schema_path async

activate_schema_path(database, schema, is_shared=True)

Activates a specific schema within the database connection's search path.

This method modifies the search_path for the current database session, allowing queries to implicitly reference objects within the specified schema.

Warning: This method is deprecated and considered insecure due to improper schema escaping. It should not be used in production environments.

PARAMETER DESCRIPTION
database

The database instance on which to activate the schema path.

TYPE: Database

schema

The name of the schema to add to the search path.

TYPE: str

is_shared

If True, adds 'shared' to the search path along with the specified schema. Defaults to True.

TYPE: bool DEFAULT: True

Source code in edgy/core/connection/schemas.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
async def activate_schema_path(
    self, database: Database, schema: str, is_shared: bool = True
) -> None:
    """
    Activates a specific schema within the database connection's search path.

    This method modifies the `search_path` for the current database session,
    allowing queries to implicitly reference objects within the specified schema.

    Warning: This method is deprecated and considered insecure due to improper
    schema escaping. It should not be used in production environments.

    Args:
        database: The database instance on which to activate the schema path.
        schema: The name of the schema to add to the search path.
        is_shared: If True, adds 'shared' to the search path along with the
                   specified schema. Defaults to True.
    """
    # Issue a deprecation warning as this method is insecure.
    warnings.warn(
        "`activate_schema_path` is dangerous because the schema is not properly "
        "escaped and deprecated.",
        DeprecationWarning,
        stacklevel=2,
    )
    # Construct the SQL command to set the search_path.
    # If is_shared is True, include 'shared' in the path.
    path = (
        f"SET search_path TO {schema}, shared;"
        if is_shared
        else f"SET search_path TO {schema};"
    )
    # Convert the SQL string into a SQLAlchemy text expression.
    expression = sqlalchemy.text(path)
    # Execute the SQL expression on the provided database.
    await database.execute(expression)

create_schema async

create_schema(schema, if_not_exists=False, init_models=False, init_tenant_models=False, update_cache=True, databases=(None,))

Creates a new database schema and optionally initializes models within it.

This method handles the creation of a new schema and can populate it with tables for both regular models and tenant-specific models, respecting global field constraints.

PARAMETER DESCRIPTION
schema

The name of the schema to be created.

TYPE: str

if_not_exists

If True, the schema will only be created if it does not already exist, preventing an error. Defaults to False.

TYPE: bool DEFAULT: False

init_models

If True, all models registered with the registry will have their tables created within the new schema. Defaults to False.

TYPE: bool DEFAULT: False

init_tenant_models

If True, tenant-specific models will have their tables created within the new schema. This operation temporarily bypasses global field constraints. Defaults to False.

TYPE: bool DEFAULT: False

update_cache

If True, the model's schema cache will be updated. Defaults to True.

TYPE: bool DEFAULT: True

databases

A sequence of database names (keys from registry.extra) or None for the default database, on which the schema should be created. Defaults to (None,), meaning only the default database.

TYPE: Sequence[str | None] DEFAULT: (None,)

Raises: SchemaError: If there is an issue during schema creation or table initialization within the schema.

Source code in edgy/core/connection/schemas.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
async def create_schema(
    self,
    schema: str,
    if_not_exists: bool = False,
    init_models: bool = False,
    init_tenant_models: bool = False,
    update_cache: bool = True,
    databases: Sequence[str | None] = (None,),
) -> None:
    """
    Creates a new database schema and optionally initializes models within it.

    This method handles the creation of a new schema and can populate it
    with tables for both regular models and tenant-specific models,
    respecting global field constraints.

    Args:
        schema: The name of the schema to be created.
        if_not_exists: If True, the schema will only be created if it does
                       not already exist, preventing an error. Defaults to False.
        init_models: If True, all models registered with the registry will have
                     their tables created within the new schema. Defaults to False.
        init_tenant_models: If True, tenant-specific models will have their
                           tables created within the new schema. This operation
                           temporarily bypasses global field constraints. Defaults to False.
        update_cache: If True, the model's schema cache will be updated.
                      Defaults to True.
        databases: A sequence of database names (keys from `registry.extra`)
                   or None for the default database, on which the schema
                   should be created. Defaults to `(None,)`, meaning only
                   the default database.
    Raises:
        SchemaError: If there is an issue during schema creation or table
                     initialization within the schema.
    """
    tenant_tables: list[sqlalchemy.Table] = []
    # If init_models is True, iterate through all registered models and
    # update their table schema and cache.
    if init_models:
        for model_class in self.registry.models.values():
            model_class.table_schema(schema=schema, update_cache=update_cache)

    # If init_tenant_models is True, handle the creation of tenant-specific model tables.
    if init_tenant_models:
        # Temporarily disable global field constraints for tenant model building.
        token = NO_GLOBAL_FIELD_CONSTRAINTS.set(True)
        try:
            # Iterate through tenant models and build their tables with the specified schema.
            for model_class in self.registry.tenant_models.values():
                tenant_tables.append(model_class.build(schema=schema))
        finally:
            # Ensure global field constraints are re-enabled after processing tenant models.
            NO_GLOBAL_FIELD_CONSTRAINTS.reset(token)

        # Perform a second pass to add global field constraints to tenant models.
        for model_class in self.registry.tenant_models.values():
            model_class.add_global_field_constraints(schema=schema)

    def execute_create(connection: sqlalchemy.Connection, name: str | None) -> None:
        """
        Internal helper function to execute the schema and table creation
        within a given database connection.
        """
        try:
            # Attempt to create the schema.
            connection.execute(
                sqlalchemy.schema.CreateSchema(name=schema, if_not_exists=if_not_exists)
            )
        except ProgrammingError as e:
            # Raise a SchemaError if there's a programming error during schema creation.
            raise SchemaError(detail=e.orig.args[0]) from e

        # If tenant_tables exist, create them within the schema.
        if tenant_tables:
            self.registry.metadata_by_name[name].create_all(
                connection, checkfirst=if_not_exists, tables=tenant_tables
            )
        # If init_models is True, create all registered model tables within the schema.
        if init_models:
            self.registry.metadata_by_name[name].create_all(
                connection, checkfirst=if_not_exists
            )

    ops = []
    # Iterate through the specified databases to perform schema creation.
    for database_name in databases:
        # Determine which database instance to use based on database_name.
        db = (
            self.registry.database
            if database_name is None
            else self.registry.extra[database_name]
        )
        # Enter an asynchronous context for the database connection, disabling rollback.
        # prevents warning of inperformance
        async with db as db:
            with db.force_rollback(False):
                # Append the run_sync operation to the list of operations.
                ops.append(db.run_sync(execute_create, database_name))
    # Await all schema creation operations concurrently.
    await asyncio.gather(*ops)

drop_schema async

drop_schema(schema, cascade=False, if_exists=False, databases=(None,))

Drops an existing database schema, optionally cascading the drop to all contained objects.

PARAMETER DESCRIPTION
schema

The name of the schema to be dropped.

TYPE: str

cascade

If True, all objects (tables, views, etc.) within the schema will also be dropped. Defaults to False.

TYPE: bool DEFAULT: False

if_exists

If True, the schema will only be dropped if it exists, preventing an error if it does not. Defaults to False.

TYPE: bool DEFAULT: False

databases

A sequence of database names (keys from registry.extra) or None for the default database, from which the schema should be dropped. Defaults to (None,), meaning only the default database.

TYPE: Sequence[str | None] DEFAULT: (None,)

Raises: SchemaError: If there is an issue during schema drop operation.

Source code in edgy/core/connection/schemas.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
async def drop_schema(
    self,
    schema: str,
    cascade: bool = False,
    if_exists: bool = False,
    databases: Sequence[str | None] = (None,),
) -> None:
    """
    Drops an existing database schema, optionally cascading the drop
    to all contained objects.

    Args:
        schema: The name of the schema to be dropped.
        cascade: If True, all objects (tables, views, etc.) within the
                 schema will also be dropped. Defaults to False.
        if_exists: If True, the schema will only be dropped if it exists,
                   preventing an error if it does not. Defaults to False.
        databases: A sequence of database names (keys from `registry.extra`)
                   or None for the default database, from which the schema
                   should be dropped. Defaults to `(None,)`, meaning only
                   the default database.
    Raises:
        SchemaError: If there is an issue during schema drop operation.
    """

    def execute_drop(connection: sqlalchemy.Connection) -> None:
        """
        Internal helper function to execute the schema drop
        within a given database connection.
        """
        try:
            # Attempt to drop the schema.
            connection.execute(
                sqlalchemy.schema.DropSchema(name=schema, cascade=cascade, if_exists=if_exists)
            )
        except DBAPIError as e:
            # Raise a SchemaError if there's a database API error during schema drop.
            raise SchemaError(detail=e.orig.args[0]) from e

    ops = []
    # Iterate through the specified databases to perform schema drop.
    for database_name in databases:
        # Determine which database instance to use based on database_name.
        db = (
            self.registry.database
            if database_name is None
            else self.registry.extra[database_name]
        )
        # Enter an asynchronous context for the database connection, disabling rollback.
        # prevents warning of inperformance
        async with db as db:
            with db.force_rollback(False):
                # Append the run_sync operation to the list of operations.
                ops.append(db.run_sync(execute_drop))
    # Await all schema drop operations concurrently.
    await asyncio.gather(*ops)

get_metadata_of_all_schemes async

get_metadata_of_all_schemes(database, *, no_reflect=False)

Retrieves metadata and a list of all schema names for a given database.

This method reflects the table structures for registered models within each discovered schema if no_reflect is False.

PARAMETER DESCRIPTION
database

The database instance from which to retrieve schema metadata.

TYPE: Database

no_reflect

If True, tables will not be reflected into the metadata. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
tuple[MetaData, list[str]]

A tuple containing: - sqlalchemy.MetaData: An SQLAlchemy MetaData object populated with reflected tables (if no_reflect is False). - list[str]: A list of schema names found in the database.

Source code in edgy/core/connection/schemas.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
async def get_metadata_of_all_schemes(
    self, database: Database, *, no_reflect: bool = False
) -> tuple[sqlalchemy.MetaData, list[str]]:
    """
    Retrieves metadata and a list of all schema names for a given database.

    This method reflects the table structures for registered models
    within each discovered schema if `no_reflect` is False.

    Args:
        database: The database instance from which to retrieve schema metadata.
        no_reflect: If True, tables will not be reflected into the metadata.
                    Defaults to False.

    Returns:
        A tuple containing:
            - sqlalchemy.MetaData: An SQLAlchemy MetaData object populated
                                   with reflected tables (if `no_reflect` is False).
            - list[str]: A list of schema names found in the database.
    """
    # Get a set of all table names registered in the registry.
    tablenames = self.registry.get_tablenames()

    async with database as database:
        # Initialize an empty list to store schema names.
        list_schemes: list[str] = []
        # Create a new SQLAlchemy MetaData object.
        metadata = sqlalchemy.MetaData()
        # Force no rollback for the database connection.
        with database.force_rollback(False):

            def wrapper(connection: sqlalchemy.Connection) -> None:
                """
                Internal wrapper function to perform synchronous database
                inspection and reflection.
                """
                nonlocal list_schemes
                # Create an inspector from the database connection.
                inspector = sqlalchemy.inspect(connection)
                # Get the default schema name from the inspector.
                default_schema_name = inspector.default_schema_name
                # Get all schema names and replace the default schema name with an empty string.
                list_schemes = [
                    "" if default_schema_name == schema else schema
                    for schema in inspector.get_schema_names()
                ]
                # If no_reflect is False, reflect table metadata for each schema.
                if not no_reflect:
                    for schema in list_schemes:
                        metadata.reflect(
                            connection, schema=schema, only=lambda name, _: name in tablenames
                        )

            # Run the synchronous wrapper function on the database.
            await database.run_sync(wrapper)
            # Return the populated metadata and the list of schema names.
            return metadata, list_schemes

get_schemes_tree async

get_schemes_tree(*, no_reflect=False)

Builds a comprehensive tree-like structure of schemas across all registered databases.

Each entry in the resulting dictionary represents a database (identified by its name or None for the default), containing its URL, its SQLAlchemy MetaData, and a list of schema names.

PARAMETER DESCRIPTION
no_reflect

If True, tables will not be reflected into the metadata for any schema. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[str | None, tuple[str, MetaData, list[str]]]

A dictionary where keys are database names (or None for the default

dict[str | None, tuple[str, MetaData, list[str]]]

database) and values are tuples containing: - str: The URL of the database. - sqlalchemy.MetaData: The MetaData object for the database. - list[str]: A list of schema names found in that database.

Source code in edgy/core/connection/schemas.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
async def get_schemes_tree(
    self, *, no_reflect: bool = False
) -> dict[str | None, tuple[str, sqlalchemy.MetaData, list[str]]]:
    """
    Builds a comprehensive tree-like structure of schemas across all
    registered databases.

    Each entry in the resulting dictionary represents a database (identified
    by its name or None for the default), containing its URL, its SQLAlchemy
    MetaData, and a list of schema names.

    Args:
        no_reflect: If True, tables will not be reflected into the metadata
                    for any schema. Defaults to False.

    Returns:
        A dictionary where keys are database names (or None for the default
        database) and values are tuples containing:
            - str: The URL of the database.
            - sqlalchemy.MetaData: The MetaData object for the database.
            - list[str]: A list of schema names found in that database.
    """
    # Initialize the schemes_tree dictionary.
    schemes_tree: dict[str | None, tuple[str, sqlalchemy.MetaData, list[str]]] = {
        None: (
            str(self.database.url),
            # Get metadata and schemes for the default database.
            *(
                await self.get_metadata_of_all_schemes(
                    self.registry.database, no_reflect=no_reflect
                )
            ),
        )
    }
    # Iterate through extra databases registered in the registry.
    for key, val in self.registry.extra.items():
        # Populate schemes_tree for each extra database.
        schemes_tree[key] = (
            str(val.url),
            # Get metadata and schemes for the current extra database.
            *(await self.get_metadata_of_all_schemes(val, no_reflect=no_reflect)),
        )
    # Return the complete schemes_tree.
    return schemes_tree