refactor: streamline migrations

This commit is contained in:
2025-03-09 23:22:24 +02:00
parent aaab9069bf
commit 9a23f5c79d
2 changed files with 101 additions and 88 deletions

View File

@ -62,24 +62,31 @@ class Migrations(LazyLogging):
"""
return Migrations(connection, configuration).run()
def migration(self, cursor: Cursor, migration: Migration) -> None:
def apply_migrations(self, migrations: list[Migration]) -> None:
"""
perform single migration
perform migrations explicitly
Args:
cursor(Cursor): connection cursor
migration(Migration): single migration to perform
migrations(list[Migration]): list of migrations to perform
"""
self.logger.info("applying table migration %s at index %s", migration.name, migration.index)
for statement in migration.steps:
cursor.execute(statement)
self.logger.info("table migration %s at index %s has been applied", migration.name, migration.index)
self.logger.info("perform data migration %s at index %s", migration.name, migration.index)
migration.migrate_data(self.connection, self.configuration)
self.logger.info(
"data migration %s at index %s has been performed",
migration.name, migration.index)
previous_isolation = self.connection.isolation_level
try:
self.connection.isolation_level = None
cursor = self.connection.cursor()
try:
cursor.execute("begin exclusive")
for migration in migrations:
self.perform_migration(cursor, migration)
except Exception:
self.logger.exception("migration failed with exception")
cursor.execute("rollback")
raise
else:
cursor.execute("commit")
finally:
cursor.close()
finally:
self.connection.isolation_level = previous_isolation
def migrations(self) -> list[Migration]:
"""
@ -114,6 +121,25 @@ class Migrations(LazyLogging):
return migrations
def perform_migration(self, cursor: Cursor, migration: Migration) -> None:
"""
perform single migration
Args:
cursor(Cursor): connection cursor
migration(Migration): single migration to perform
"""
self.logger.info("applying table migration %s at index %s", migration.name, migration.index)
for statement in migration.steps:
cursor.execute(statement)
self.logger.info("table migration %s at index %s has been applied", migration.name, migration.index)
self.logger.info("perform data migration %s at index %s", migration.name, migration.index)
migration.migrate_data(self.connection, self.configuration)
self.logger.info(
"data migration %s at index %s has been performed",
migration.name, migration.index)
def run(self) -> MigrationResult:
"""
perform migrations
@ -122,6 +148,7 @@ class Migrations(LazyLogging):
MigrationResult: current schema version
"""
migrations = self.migrations()
current_version = self.user_version()
expected_version = len(migrations)
result = MigrationResult(old_version=current_version, new_version=expected_version)
@ -130,25 +157,8 @@ class Migrations(LazyLogging):
self.logger.info("no migrations required")
return result
previous_isolation = self.connection.isolation_level
try:
self.connection.isolation_level = None
cursor = self.connection.cursor()
try:
cursor.execute("begin exclusive")
for migration in migrations[current_version:]:
self.migration(cursor, migration)
cursor.execute(f"pragma user_version = {expected_version}") # no support for ? placeholders
except Exception:
self.logger.exception("migration failed with exception")
cursor.execute("rollback")
raise
else:
cursor.execute("commit")
finally:
cursor.close()
finally:
self.connection.isolation_level = previous_isolation
self.apply_migrations(migrations[current_version:])
self.connection.execute(f"pragma user_version = {expected_version}") # no support for ? placeholders
self.logger.info("migrations have been performed from version %s to %s", result.old_version, result.new_version)
return result