# -*- coding: utf-8 -*-
import os
from alembic import command as alembic_command
from alembic.config import Config as AlembicConfig
from sqlalchemy import create_engine
from watson.common import imports
from watson.console import command, ConsoleError
from watson.console.decorators import arg
from watson.db import engine, fixtures, session
from watson.di import ContainerAware
class Config(AlembicConfig):
def get_template_directory(self):
package_dir = os.path.abspath(os.path.dirname(__file__))
return os.path.join(package_dir, 'alembic', 'templates')
class BaseDatabaseCommand(ContainerAware):
__ioc_definition__ = {
'init': {
'config': lambda container: container.get('application.config')['db']
}
}
def __init__(self, config):
self.config = config
[docs]class Database(command.Base, BaseDatabaseCommand):
"""Database commands.
"""
name = 'db'
@property
def metadata(self):
metadatas = {}
for name, options in self.config['connections'].items():
metadata = options['metadata']
if isinstance(metadata, str):
metadata = imports.load_definition_from_string(metadata)
metadatas[name] = metadata
return metadatas
@property
def connections(self):
connections = {}
for name, options in self.config['connections'].items():
connections[name] = options['connection_string']
return connections
@property
def sessions(self):
return self._session_or_engine('session')
@property
def engines(self):
return self._session_or_engine('engine')
[docs] def _session_or_engine(self, type_):
"""Retrieves all the sessions or engines from the container.
"""
results = {}
for name in self.config['connections']:
obj_name = getattr(globals()[type_], 'NAME').format(name)
results[name] = self.container.get(obj_name)
return results
@arg('drop', action='store_true', default=False, optional=True)
[docs] def create(self, drop):
"""Create the relevant databases.
"""
engines = self.engines
for database, model_base in self.metadata.items():
self.write('Creating database {}...'.format(database))
engine.create_db(engines[database], model_base, drop=drop)
self.write('Created the databases.')
return True
@arg()
[docs] def populate(self):
"""Add data from fixtures to the database(s).
"""
if 'fixtures' not in self.config:
self.write('No fixtures to add.')
return False
self.write('Adding fixtures...')
sessions = self.sessions
total = fixtures.populate_all(sessions, self.config['fixtures'])
self.write('Added {} fixtures to {} database(s).'.format(
total, len(sessions)))
return True
@arg()
[docs] def dump(self):
"""Print the Schema of the database.
"""
def dump_sql(sql, *multiparams, **params):
self.write(str(sql))
connections = self.connections
for database, model_base in self.metadata.items():
self.write('Schema for "{}" from metadata {}...'.format(database, repr(model_base)))
self.write()
_engine = create_engine(
connections[database], strategy='mock', executor=dump_sql)
model_base.metadata.create_all(_engine, checkfirst=False)
return True
[docs]class Migrate(command.Base, BaseDatabaseCommand):
"""Alembic integration with Watson.
"""
name = 'db:migrate'
def _check_migrations(self):
if 'migrations' not in self.config:
raise ConsoleError(
'No migrations configuration can be found.')
@property
def database_names(self):
names = []
for name in self.config['connections']:
names.append(name)
return names
@property
def directory(self):
self._check_migrations()
return os.path.abspath(self.config['migrations']['path'])
@property
def alembic_config_file(self):
return os.path.join(self.directory, 'alembic.ini')
def alembic_config(self, with_ini=True):
self._check_migrations()
directory = os.path.abspath(self.config['migrations']['path'])
args = []
if with_ini:
args.append(self.alembic_config_file)
config = Config(*args)
config.set_main_option('script_location', directory)
config.set_main_option('databases', ', '.join(self.database_names))
config.watson = {
'config': self.config,
'container': self.container
}
return config
@arg()
[docs] def init(self):
"""Initializes Alembic migrations for the project.
"""
config = self.alembic_config(with_ini=False)
config.config_file_name = self.alembic_config_file
alembic_command.init(config, config.get_main_option('script_location'), 'watson')
return True
@arg()
[docs] def history(self, rev_range):
"""List changeset scripts in chronological order.
Args:
rev_range: Revision range in format [start]:[end]
"""
config = self.alembic_config()
alembic_command.history(config, rev_range)
return True
@arg()
[docs] def current(self):
"""Display the current revision for each database.
"""
config = self.alembic_config()
alembic_command.current(config)
return True
@arg('sql', action='store_true', default=False, optional=True)
@arg('autogenerate', action='store_true', default=False, optional=True)
@arg('message', optional=True, default='Revision')
[docs] def revision(self, sql=False, autogenerate=False, message=None):
"""Create a new revision file.
Args:
sql (bool): Don't emit SQL to database - dump to standard output instead
autogenerate (bool): Populate revision script with andidate migration operatons, based on comparison of database to model
message (string): Message string to use with 'revision'
"""
config = self.alembic_config()
return alembic_command.revision(
config, message, autogenerate=autogenerate, sql=sql)
@arg('sql', action='store_true', default=False, optional=True)
@arg('tag', default=None, optional=True)
@arg('revision', default=None)
[docs] def stamp(self, sql=False, tag=None, revision='head'):
"""'stamp' the revision table with the given revision; don't run any migrations.
Args:
sql (bool): Don't emit SQL to database - dump to standard output instead
tag (string): Arbitrary 'tag' name - can be used by custom env.py scripts
revision (string): Revision identifier
"""
config = self.alembic_config()
alembic_command.stamp(config, revision, tag=tag, sql=sql)
return True
@arg('sql', action='store_true', default=False, optional=True)
@arg('tag', default=None, optional=True)
@arg('revision', default='head', nargs='?')
[docs] def upgrade(self, sql=False, tag=None, revision='head'):
"""Upgrade to a later version.
Args:
sql (bool): Don't emit SQL to database - dump to standard output instead
tag (string): Arbitrary 'tag' name - can be used by custom env.py scripts
revision (string): Revision identifier
"""
config = self.alembic_config()
alembic_command.upgrade(config, revision, tag=tag, sql=sql)
return True
@arg('sql', action='store_true', default=False, optional=True)
@arg('tag', default=None, optional=True)
@arg('revision', default='-1', nargs='?')
[docs] def downgrade(self, sql=False, tag=None, revision='-1'):
"""Revert to a previous version.
"""
config = self.alembic_config()
alembic_command.downgrade(config, revision, tag=tag, sql=sql)
return True
@arg()
[docs] def branches(self):
"""Show current un-spliced branch points.
"""
config = self.alembic_config()
alembic_command.branches(config)
return True