diff --git a/ENVIRONMENT.rst b/ENVIRONMENT.rst index a8d3703b7..690312e21 100644 --- a/ENVIRONMENT.rst +++ b/ENVIRONMENT.rst @@ -78,3 +78,4 @@ Environment Configuration Settings - **KUBERNETES_ROLE_LABEL**: name of the label containing Postgres role when running on Kubernetens. Default is 'spilo-role'. - **KUBERNETES_SCOPE_LABEL**: name of the label containing cluster name. Default is 'version'. - **KUBERNETES_LABELS**: a JSON describing names and values of other labels used by Patroni on Kubernetes to locate its metadata. Default is '{"application": "spilo"}'. +- **ENABLE_WAL_PATH_COMPAT**: old Spilo images were generating wal path in the backup store using the following template ``/spilo/{WAL_BUCKET_SCOPE_PREFIX}{SCOPE}{WAL_BUCKET_SCOPE_SUFFIX}/wal/``, while new images adding one additional directory (``{PGVERSION}``) to the end. In order to avoid (unlikely) issues with restoring WALs (from S3/GC/and so on) when switching to ``spilo-13`` please set the ``ENABLE_WAL_PATH_COMPAT=true`` when deploying old cluster with ``spilo-13`` for the first time. After that the environment variable could be removed. Change of the WAL path also mean that backups stored in the old location will not be cleaned up automatically. diff --git a/postgres-appliance/Dockerfile b/postgres-appliance/Dockerfile index d65b189ca..3c0c73486 100644 --- a/postgres-appliance/Dockerfile +++ b/postgres-appliance/Dockerfile @@ -9,7 +9,7 @@ FROM ubuntu:18.04 as builder-false RUN export DEBIAN_FRONTEND=noninteractive \ && echo 'APT::Install-Recommends "0";\nAPT::Install-Suggests "0";' > /etc/apt/apt.conf.d/01norecommend \ && apt-get update \ - && apt-get install -y curl ca-certificates less locales jq vim-tiny gnupg1 cron runit dumb-init libcap2-bin \ + && apt-get install -y curl ca-certificates less locales jq vim-tiny gnupg1 cron runit dumb-init libcap2-bin rsync \ && ln -s chpst /usr/bin/envdir \ # Make it possible to use the following utilities without root && setcap 'cap_sys_nice+ep' /usr/bin/chrt \ @@ -528,7 +528,7 @@ RUN sed -i "s|/var/lib/postgresql.*|$PGHOME:/bin/bash|" /etc/passwd \ && usermod -a -G root postgres; \ fi -COPY scripts bootstrap /scripts/ +COPY scripts bootstrap major_upgrade /scripts/ COPY launch.sh / CMD ["/bin/sh", "/launch.sh", "init"] diff --git a/postgres-appliance/bootstrap/clone_with_wale.py b/postgres-appliance/bootstrap/clone_with_wale.py index 0e7af7b9a..254374038 100755 --- a/postgres-appliance/bootstrap/clone_with_wale.py +++ b/postgres-appliance/bootstrap/clone_with_wale.py @@ -5,6 +5,7 @@ import logging import os import re +import shlex import subprocess import sys @@ -61,12 +62,9 @@ def fix_output(output): yield '\t'.join(line.split()) -def choose_backup(output, recovery_target_time): +def choose_backup(backup_list, recovery_target_time): """ pick up the latest backup file starting before time recovery_target_time""" - reader = csv.DictReader(fix_output(output), dialect='excel-tab') - backup_list = list(reader) - if len(backup_list) <= 0: - raise Exception("wal-e could not found any backups") + match_timestamp = match = None for backup in backup_list: last_modified = parse(backup['last_modified']) @@ -74,23 +72,105 @@ def choose_backup(output, recovery_target_time): if match is None or last_modified > match_timestamp: match = backup match_timestamp = last_modified - if match is None: - raise Exception("wal-e could not found any backups prior to the point in time {0}".format(recovery_target_time)) - return match['name'] + if match is not None: + return match['name'] + + +def list_backups(env): + backup_list_cmd = build_wale_command('backup-list') + output = subprocess.check_output(backup_list_cmd, env=env) + reader = csv.DictReader(fix_output(output), dialect='excel-tab') + return list(reader) + + +def get_clone_envdir(): + from spilo_commons import get_patroni_config + + config = get_patroni_config() + restore_command = shlex.split(config['bootstrap']['clone_with_wale']['recovery_conf']['restore_command']) + if len(restore_command) > 4 and restore_command[0] == 'envdir': + return restore_command[1] + raise Exception('Failed to find clone envdir') + + +def get_possible_versions(): + from spilo_commons import LIB_DIR, get_binary_version, get_bin_dir, get_patroni_config + + config = get_patroni_config() + + max_version = float(get_binary_version(config.get('postgresql', {}).get('bin_dir'))) + + versions = {} + + for d in os.listdir(LIB_DIR): + try: + ver = get_binary_version(get_bin_dir(d)) + fver = float(ver) + if fver <= max_version: + versions[fver] = ver + except Exception: + pass + + # return possible versions in reversed order, i.e. 12, 11, 10, 9.6, and so on + return [ver for _, ver in sorted(versions.items(), reverse=True)] + + +def get_wale_environments(env): + use_walg = env.get('USE_WALG_RESTORE') == 'true' + prefix = 'WALG_' if use_walg else 'WALE_' + # len('WALE__PREFIX') = 12 + names = [name for name in env.keys() if name.endswith('_PREFIX') and name.startswith(prefix) and len(name) > 12] + if len(names) != 1: + raise Exception('Found find {0} {1}*_PREFIX environment variables, expected 1' + .format(len(names), prefix)) + + name = names[0] + value = env[name].rstrip('/') + + if '/spilo/' in value and value.endswith('/wal'): # path crafted in the configure_spilo.py? + # Try all versions descending if we don't know the version of the source cluster + for version in get_possible_versions(): + yield name, '{0}/{1}/'.format(value, version) + + # Last, try the original value + yield name, env[name] + + +def find_backup(recovery_target_time, env): + old_value = None + for name, value in get_wale_environments(env): + if not old_value: + old_value = env[name] + env[name] = value + backup_list = list_backups(env) + if backup_list: + if recovery_target_time: + backup = choose_backup(backup_list, recovery_target_time) + if backup: + return backup, (name if value != old_value else None) + else: # We assume that the LATEST backup will be for the biggest postgres version! + return 'LATEST', (name if value != old_value else None) + if recovery_target_time: + raise Exception('Could not find any backups prior to the point in time {0}'.format(recovery_target_time)) + raise Exception('Could not find any backups') def run_clone_from_s3(options): - backup_name = 'LATEST' - if options.recovery_target_time: - backup_list_cmd = build_wale_command('backup-list') - backup_list = subprocess.check_output(backup_list_cmd) - backup_name = choose_backup(backup_list, options.recovery_target_time) + env = os.environ.copy() + + backup_name, update_envdir = find_backup(options.recovery_target_time, env) + backup_fetch_cmd = build_wale_command('backup-fetch', options.datadir, backup_name) logger.info("cloning cluster %s using %s", options.name, ' '.join(backup_fetch_cmd)) if not options.dry_run: - ret = subprocess.call(backup_fetch_cmd) + ret = subprocess.call(backup_fetch_cmd, env=env) if ret != 0: raise Exception("wal-e backup-fetch exited with exit code {0}".format(ret)) + + if update_envdir: # We need to update file in the clone envdir or restore_command will fail! + envdir = get_clone_envdir() + with open(os.path.join(envdir, update_envdir), 'w') as f: + f.write(env[update_envdir]) return 0 diff --git a/postgres-appliance/bootstrap/maybe_pg_upgrade.py b/postgres-appliance/bootstrap/maybe_pg_upgrade.py index b566efe81..5a2405d8b 100644 --- a/postgres-appliance/bootstrap/maybe_pg_upgrade.py +++ b/postgres-appliance/bootstrap/maybe_pg_upgrade.py @@ -9,12 +9,12 @@ def main(): from pg_upgrade import PostgresqlUpgrade from patroni.config import Config from patroni.utils import polling_loop + from spilo_commons import get_binary_version config = Config(sys.argv[1]) - config['postgresql'].update({'callbacks': {}, 'pg_ctl_timeout': 3600*24*7}) - upgrade = PostgresqlUpgrade(config['postgresql']) + upgrade = PostgresqlUpgrade(config) - bin_version = upgrade.get_binary_version() + bin_version = get_binary_version(upgrade.pgcommand('')) cluster_version = upgrade.get_cluster_version() if cluster_version == bin_version: @@ -37,13 +37,9 @@ def main(): upgrade.stop(block_callbacks=True, checkpoint=False) raise Exception('Failed to run bootstrap.post_init') - locale = upgrade.query('SHOW lc_collate').fetchone()[0] - encoding = upgrade.query('SHOW server_encoding').fetchone()[0] - initdb_config = [{'locale': locale}, {'encoding': encoding}] - if upgrade.query("SELECT current_setting('data_checksums')::bool").fetchone()[0]: - initdb_config.append('data-checksums') + if not upgrade.prepare_new_pgdata(bin_version): + raise Exception('initdb failed') - logger.info('Dropping objects from the cluster which could be incompatible') try: upgrade.drop_possibly_incompatible_objects() except Exception: @@ -54,15 +50,18 @@ def main(): if not upgrade.stop(block_callbacks=True, checkpoint=False): raise Exception('Failed to stop the cluster with old postgres') - logger.info('initdb config: %s', initdb_config) - - logger.info('Executing pg_upgrade') - if not upgrade.do_upgrade(bin_version, initdb_config): + if not upgrade.do_upgrade(): raise Exception('Failed to upgrade cluster from {0} to {1}'.format(cluster_version, bin_version)) logger.info('Starting the cluster with new postgres after upgrade') if not upgrade.start(): raise Exception('Failed to start the cluster with new postgres') + + try: + upgrade.update_extensions() + except Exception as e: + logger.error('Failed to update extensions: %r', e) + upgrade.analyze() @@ -71,8 +70,10 @@ def call_maybe_pg_upgrade(): import os import subprocess + from spilo_commons import PATRONI_CONFIG_FILE + my_name = os.path.abspath(inspect.getfile(inspect.currentframe())) - ret = subprocess.call([sys.executable, my_name, os.path.join(os.getenv('PGHOME'), 'postgres.yml')]) + ret = subprocess.call([sys.executable, my_name, PATRONI_CONFIG_FILE]) if ret != 0: logger.error('%s script failed', my_name) return ret diff --git a/postgres-appliance/bootstrap/pg_upgrade.py b/postgres-appliance/bootstrap/pg_upgrade.py deleted file mode 100644 index d9cb8a191..000000000 --- a/postgres-appliance/bootstrap/pg_upgrade.py +++ /dev/null @@ -1,134 +0,0 @@ -import logging -import os -import shutil -import subprocess -import re -import psutil - -from patroni.postgresql import Postgresql -from patroni.postgresql.connection import get_connection_cursor - -logger = logging.getLogger(__name__) - - -class PostgresqlUpgrade(Postgresql): - - def adjust_shared_preload_libraries(self, version): - shared_preload_libraries = self.config.get('parameters').get('shared_preload_libraries') - self._old_config_values['shared_preload_libraries'] = shared_preload_libraries - - extensions = { - 'timescaledb': (9.6, 12), - 'pg_cron': (9.5, 13), - 'pg_stat_kcache': (9.4, 13), - 'pg_partman': (9.4, 13), - 'pg_mon': (11, 13) - } - - filtered = [] - for value in shared_preload_libraries.split(','): - value = value.strip() - if value not in extensions or version >= extensions[value][0] and version <= extensions[value][1]: - filtered.append(value) - self.config.get('parameters')['shared_preload_libraries'] = ','.join(filtered) - - def start_old_cluster(self, config, version): - self.set_bin_dir(version) - - version = float(version) - - config[config['method']]['command'] = 'true' - if version < 9.5: # 9.4 and older don't have recovery_target_action - action = config[config['method']].get('recovery_target_action') - config[config['method']]['pause_at_recovery_target'] = str(action == 'pause').lower() - - # make sure we don't archive wals from the old version - self._old_config_values = {'archive_mode': self.config.get('parameters').get('archive_mode')} - self.config.get('parameters')['archive_mode'] = 'off' - - # and don't load shared_preload_libraries which don't exist in the old version - self.adjust_shared_preload_libraries(version) - - return self.bootstrap.bootstrap(config) - - def get_binary_version(self): - version = subprocess.check_output([self.pgcommand('postgres'), '--version']).decode() - version = re.match(r'^[^\s]+ [^\s]+ (\d+)(\.(\d+))?', version) - return '.'.join([version.group(1), version.group(3)]) if int(version.group(1)) < 10 else version.group(1) - - def get_cluster_version(self): - with open(self._version_file) as f: - return f.read().strip() - - def set_bin_dir(self, version): - self._old_bin_dir = self._bin_dir - self._bin_dir = '/usr/lib/postgresql/{0}/bin'.format(version) - - def drop_possibly_incompatible_objects(self): - conn_kwargs = self.config.local_connect_kwargs - for p in ['connect_timeout', 'options']: - conn_kwargs.pop(p, None) - - for d in self.query('SELECT datname FROM pg_catalog.pg_database WHERE datallowconn'): - conn_kwargs['database'] = d[0] - with get_connection_cursor(**conn_kwargs) as cur: - cur.execute("SET synchronous_commit = 'local'") - logger.info('Executing "DROP FUNCTION metric_helpers.pg_stat_statements" in the database="%s"', d[0]) - cur.execute("DROP FUNCTION metric_helpers.pg_stat_statements(boolean) CASCADE") - logger.info('Executing "DROP EXTENSION IF EXISTS amcheck_next" in the database="%s"', d[0]) - cur.execute("DROP EXTENSION IF EXISTS amcheck_next") - - def pg_upgrade(self): - upgrade_dir = self._data_dir + '_upgrade' - if os.path.exists(upgrade_dir) and os.path.isdir(upgrade_dir): - shutil.rmtree(upgrade_dir) - - os.makedirs(upgrade_dir) - - old_cwd = os.getcwd() - os.chdir(upgrade_dir) - - pg_upgrade_args = ['-k', '-j', str(psutil.cpu_count()), - '-b', self._old_bin_dir, '-B', self._bin_dir, - '-d', self._old_data_dir, '-D', self._data_dir, - '-O', "-c timescaledb.restoring='on'"] - if 'username' in self.config.superuser: - pg_upgrade_args += ['-U', self.config.superuser['username']] - - if subprocess.call([self.pgcommand('pg_upgrade')] + pg_upgrade_args) == 0: - os.chdir(old_cwd) - shutil.rmtree(upgrade_dir) - shutil.rmtree(self._old_data_dir) - return True - - def do_upgrade(self, version, initdb_config): - self._data_dir = os.path.abspath(self._data_dir) - self._old_data_dir = self._data_dir + '_old' - os.rename(self._data_dir, self._old_data_dir) - - self.set_bin_dir(version) - - # restore original values of archive_mode and shared_preload_libraries - for name, value in self._old_config_values.items(): - if value is None: - self.config.get('parameters').pop(name) - else: - self.config.get('parameters')[name] = value - - if not self.bootstrap._initdb(initdb_config): - return False - - # Copy old configs. XXX: some parameters might be incompatible! - for f in os.listdir(self._old_data_dir): - if f.startswith('postgresql.') or f.startswith('pg_hba.conf') or f == 'patroni.dynamic.json': - shutil.copy(os.path.join(self._old_data_dir, f), os.path.join(self._data_dir, f)) - - self.config.write_postgresql_conf() - - return self.pg_upgrade() - - def analyze(self): - vacuumdb_args = ['-a', '-Z', '-j', str(psutil.cpu_count())] - if 'username' in self.config.superuser: - vacuumdb_args += ['-U', self.config.superuser['username']] - subprocess.call([self.pgcommand('vacuumdb')] + vacuumdb_args) diff --git a/postgres-appliance/major_upgrade/inplace_upgrade.py b/postgres-appliance/major_upgrade/inplace_upgrade.py new file mode 100644 index 000000000..de373ffa7 --- /dev/null +++ b/postgres-appliance/major_upgrade/inplace_upgrade.py @@ -0,0 +1,748 @@ +#!/usr/bin/env python +import json +import logging +import os +import psutil +import psycopg2 +import shlex +import shutil +import subprocess +import sys +import time +import yaml + +from collections import defaultdict +from threading import Thread +from multiprocessing.pool import ThreadPool + +logger = logging.getLogger(__name__) + +RSYNC_PORT = 5432 + + +def patch_wale_prefix(value, new_version): + from spilo_commons import is_valid_pg_version + + if '/spilo/' in value and '/wal/' in value: # path crafted in the configure_spilo.py? + basename, old_version = os.path.split(value.rstrip('/')) + if is_valid_pg_version(old_version) and old_version != new_version: + return os.path.join(basename, new_version) + return value + + +def update_configs(new_version): + from spilo_commons import append_extentions, get_bin_dir, get_patroni_config, write_file, write_patroni_config + + config = get_patroni_config() + + config['postgresql']['bin_dir'] = get_bin_dir(new_version) + + version = float(new_version) + shared_preload_libraries = config['postgresql'].get('parameters', {}).get('shared_preload_libraries') + if shared_preload_libraries is not None: + config['postgresql']['parameters']['shared_preload_libraries'] =\ + append_extentions(shared_preload_libraries, version) + + extwlist_extensions = config['postgresql'].get('parameters', {}).get('extwlist.extensions') + if extwlist_extensions is not None: + config['postgresql']['parameters']['extwlist.extensions'] =\ + append_extentions(extwlist_extensions, version, True) + + write_patroni_config(config, True) + + # update wal-e/wal-g envdir files + restore_command = shlex.split(config['postgresql'].get('recovery_conf', {}).get('restore_command', '')) + if len(restore_command) > 4 and restore_command[0] == 'envdir': + envdir = restore_command[1] + + try: + for name in os.listdir(envdir): + # len('WALE__PREFIX') = 12 + if len(name) > 12 and name.endswith('_PREFIX') and name[:5] in ('WALE_', 'WALG_'): + name = os.path.join(envdir, name) + try: + with open(name) as f: + value = f.read().strip() + new_value = patch_wale_prefix(value, new_version) + if new_value != value: + write_file(new_value, name, True) + except Exception as e: + logger.error('Failed to process %s: %r', name, e) + except Exception: + pass + else: + return envdir + + +def kill_patroni(): + logger.info('Restarting patroni') + patroni = next(iter(filter(lambda p: p.info['name'] == 'patroni', psutil.process_iter(['name']))), None) + if patroni: + patroni.kill() + + +class InplaceUpgrade(object): + + def __init__(self, config): + from patroni.dcs import get_dcs + from patroni.request import PatroniRequest + from pg_upgrade import PostgresqlUpgrade + + self.config = config + self.postgresql = PostgresqlUpgrade(config) + + self.cluster_version = self.postgresql.get_cluster_version() + self.desired_version = self.get_desired_version() + + self.upgrade_required = float(self.cluster_version) < float(self.desired_version) + + self.paused = False + self.new_data_created = False + self.upgrade_complete = False + self.rsyncd_configs_created = False + self.rsyncd_started = False + + if self.upgrade_required: + self.dcs = get_dcs(config) + self.request = PatroniRequest(config, True) + + @staticmethod + def get_desired_version(): + from spilo_commons import get_bin_dir, get_binary_version + + try: + spilo_configuration = yaml.safe_load(os.environ.get('SPILO_CONFIGURATION', '')) + bin_dir = spilo_configuration.get('postgresql', {}).get('bin_dir') + except Exception: + bin_dir = None + + if not bin_dir and os.environ.get('PGVERSION'): + bin_dir = get_bin_dir(os.environ['PGVERSION']) + + return get_binary_version(bin_dir) + + def check_patroni_api(self, member): + try: + response = self.request(member, timeout=2, retries=0) + return response.status == 200 + except Exception as e: + return logger.error('API request to %s name failed: %r', member.name, e) + + def toggle_pause(self, paused): + from patroni.utils import polling_loop + + cluster = self.dcs.get_cluster() + config = cluster.config.data.copy() + if cluster.is_paused() == paused: + return logger.error('Cluster is %spaused, can not continue', ('' if paused else 'not ')) + + config['pause'] = paused + if not self.dcs.set_config_value(json.dumps(config, separators=(',', ':')), cluster.config.index): + return logger.error('Failed to pause cluster, can not continue') + + self.paused = paused + + old = {m.name: m.index for m in cluster.members if m.api_url} + ttl = cluster.config.data.get('ttl', self.dcs.ttl) + for _ in polling_loop(ttl + 1): + cluster = self.dcs.get_cluster() + if all(m.data.get('pause', False) == paused for m in cluster.members if m.name in old): + logger.info('Maintenance mode %s', ('enabled' if paused else 'disabled')) + return True + + remaining = [m.name for m in cluster.members if m.data.get('pause', False) != paused + and m.name in old and old[m.name] != m.index] + if remaining: + return logger.error("%s members didn't recognized pause state after %s seconds", remaining, ttl) + + def resume_cluster(self): + if self.paused: + try: + logger.info('Disabling maintenance mode') + self.toggle_pause(False) + except Exception as e: + logger.error('Failed to resume cluster: %r', e) + + def ensure_replicas_state(self, cluster): + """ + This method checks the satatus of all replicas and also tries to open connections + to all of them and puts into the `self.replica_connections` dict for a future usage. + """ + self.replica_connections = {} + streaming = {a: l for a, l in self.postgresql.query( + ("SELECT client_addr, pg_catalog.pg_{0}_{1}_diff(pg_catalog.pg_current_{0}_{1}()," + " COALESCE(replay_{1}, '0/0'))::bigint FROM pg_catalog.pg_stat_replication") + .format(self.postgresql.wal_name, self.postgresql.lsn_name))} + + def ensure_replica_state(member): + ip = member.conn_kwargs().get('host') + lag = streaming.get(ip) + if lag is None: + return logger.error('Member %s is not streaming from the primary', member.name) + if lag > 16*1024*1024: + return logger.error('Replication lag %s on member %s is too high', lag, member.name) + + if not self.check_patroni_api(member): + return logger.error('Patroni on %s is not healthy', member.name) + + conn_kwargs = member.conn_kwargs(self.postgresql.config.superuser) + conn_kwargs['options'] = '-c statement_timeout=0 -c search_path=' + conn_kwargs.pop('connect_timeout', None) + + conn = psycopg2.connect(**conn_kwargs) + conn.autocommit = True + cur = conn.cursor() + cur.execute('SELECT pg_catalog.pg_is_in_recovery()') + if not cur.fetchone()[0]: + return logger.error('Member %s is not running as replica!', member.name) + self.replica_connections[member.name] = (ip, cur) + return True + + return all(ensure_replica_state(member) for member in cluster.members if member.name != self.postgresql.name) + + def sanity_checks(self, cluster): + if not cluster.initialize: + return logger.error('Upgrade can not be triggered because the cluster is not initialized') + + if len(cluster.members) != self.replica_count: + return logger.error('Upgrade can not be triggered because the number of replicas does not match (%s != %s)', + len(cluster.members), self.replica_count) + if cluster.is_paused(): + return logger.error('Upgrade can not be triggered because Patroni is in maintenance mode') + + lock_owner = cluster.leader and cluster.leader.name + if lock_owner != self.postgresql.name: + return logger.error('Upgrade can not be triggered because the current node does not own the leader lock') + + return self.ensure_replicas_state(cluster) + + def remove_initialize_key(self): + from patroni.utils import polling_loop + + for _ in polling_loop(10): + cluster = self.dcs.get_cluster() + if cluster.initialize is None: + return True + logging.info('Removing initialize key') + if self.dcs.cancel_initialization(): + return True + logger.error('Failed to remove initialize key') + + def wait_for_replicas(self, checkpoint_lsn): + from patroni.utils import polling_loop + + logger.info('Waiting for replica nodes to catch up with primary') + + query = ("SELECT pg_catalog.pg_{0}_{1}_diff(pg_catalog.pg_last_{0}_replay_{1}()," + " '0/0')::bigint").format(self.postgresql.wal_name, self.postgresql.lsn_name) + + status = {} + + for _ in polling_loop(60): + synced = True + for name, (_, cur) in self.replica_connections.items(): + prev = status.get(name) + if prev and prev >= checkpoint_lsn: + continue + + cur.execute(query) + lsn = cur.fetchone()[0] + status[name] = lsn + + if lsn < checkpoint_lsn: + synced = False + + if synced: + logger.info('All replicas are ready') + return True + + for name in self.replica_connections.keys(): + lsn = status.get(name) + if not lsn or lsn < checkpoint_lsn: + logger.error('Node %s did not catched up. Lag=%s', name, checkpoint_lsn - lsn) + + def create_rsyncd_configs(self): + self.rsyncd_configs_created = True + self.rsyncd_conf_dir = '/run/rsync' + self.rsyncd_feedback_dir = os.path.join(self.rsyncd_conf_dir, 'feedback') + + if not os.path.exists(self.rsyncd_feedback_dir): + os.makedirs(self.rsyncd_feedback_dir) + + self.rsyncd_conf = os.path.join(self.rsyncd_conf_dir, 'rsyncd.conf') + secrets_file = os.path.join(self.rsyncd_conf_dir, 'rsyncd.secrets') + + auth_users = ','.join(self.replica_connections.keys()) + replica_ips = ','.join(str(v[0]) for v in self.replica_connections.values()) + + with open(self.rsyncd_conf, 'w') as f: + f.write("""port = {0} +use chroot = false + +[pgroot] +path = {1} +read only = true +timeout = 300 +post-xfer exec = echo $RSYNC_EXIT_STATUS > {2}/$RSYNC_USER_NAME +auth users = {3} +secrets file = {4} +hosts allow = {5} +hosts deny = * +""".format(RSYNC_PORT, os.path.dirname(self.postgresql.data_dir), + self.rsyncd_feedback_dir, auth_users, secrets_file, replica_ips)) + + with open(secrets_file, 'w') as f: + for name in self.replica_connections.keys(): + f.write('{0}:{1}\n'.format(name, self.postgresql.config.replication['password'])) + os.chmod(secrets_file, 0o600) + + def start_rsyncd(self): + self.create_rsyncd_configs() + self.rsyncd = subprocess.Popen(['rsync', '--daemon', '--no-detach', '--config=' + self.rsyncd_conf]) + self.rsyncd_started = True + + def stop_rsyncd(self): + if self.rsyncd_started: + logger.info('Stopping rsyncd') + try: + self.rsyncd.kill() + self.rsyncd_started = False + except Exception as e: + return logger.error('Failed to kill rsyncd: %r', e) + + if self.rsyncd_configs_created and os.path.exists(self.rsyncd_conf_dir): + try: + shutil.rmtree(self.rsyncd_conf_dir) + self.rsyncd_configs_created = False + except Exception as e: + logger.error('Failed to remove %s: %r', self.rsync_conf_dir, e) + + def checkpoint(self, member): + name, (_, cur) = member + try: + cur.execute('CHECKPOINT') + return name, True + except Exception as e: + logger.error('CHECKPOINT on % failed: %r', name, e) + return name, False + + def checkpoint_replicas(self): + logger.info('Executing CHECKPOINT on replicas %s', ','.join(self.replica_connections.keys())) + pool = ThreadPool(len(self.replica_connections)) + results = pool.map(self.checkpoint, self.replica_connections.items()) # Run CHECKPOINT on replicas in parallel + pool.close() + pool.join() + + for name, status in results: + if not status: + self.replica_connections.pop(name) + + return self.replica_connections + + def rsync_replicas(self, primary_ip): + from patroni.utils import polling_loop + + if not self.checkpoint_replicas(): + return + + logger.info('Notifying replicas %s to start rsync', ','.join(self.replica_connections.keys())) + ret = True + status = {} + for name, (ip, cur) in self.replica_connections.items(): + try: + cur.execute("SELECT pg_catalog.pg_backend_pid()") + pid = cur.fetchone()[0] + # We use the COPY TO PROGRAM "hack" to start the rsync on replicas. + # There are a few important moments: + # 1. The script is started as a child process of postgres backend, which + # is running with the clean environment. I.e., the script will not see + # values of PGVERSION, SPILO_CONFIGURATION, KUBERNETES_SERVICE_HOST + # 2. Since access to the DCS might not be possible with pass the primary_ip + # 3. The desired_version passed explicitly to guaranty 100% match with the master + # 4. In order to protect from the accidental "rsync" we pass the pid of postgres backend. + # The script will check that it is the child of the very specific postgres process. + cur.execute("COPY (SELECT) TO PROGRAM 'nohup {0} /scripts/inplace_upgrade.py {1} {2} {3}'" + .format(sys.executable, self.desired_version, primary_ip, pid)) + conn = cur.connection + cur.close() + conn.close() + except Exception as e: + logger.error('COPY TO PROGRAM on %s failed: %r', name, e) + status[name] = False + ret = False + + for name in status.keys(): + self.replica_connections.pop(name) + + logger.info('Waiting for replicas rsync to complete') + status.clear() + for _ in polling_loop(300): + synced = True + for name in self.replica_connections.keys(): + feedback = os.path.join(self.rsyncd_feedback_dir, name) + if name not in status and os.path.exists(feedback): + with open(feedback) as f: + status[name] = f.read().strip() + + if name not in status: + synced = False + if synced: + break + + for name in self.replica_connections.keys(): + result = status.get(name) + if result is None: + logger.error('Did not received rsync feedback from %s after 300 seconds', name) + ret = False + elif not result.startswith('0'): + logger.error('Rsync on %s finished with code %s', name, result) + ret = False + return ret + + def wait_replica_restart(self, member): + from patroni.utils import polling_loop + + for _ in polling_loop(10): + try: + response = self.request(member, timeout=2, retries=0) + if response.status == 200: + data = json.loads(response.data.decode('utf-8')) + database_system_identifier = data.get('database_system_identifier') + if database_system_identifier and database_system_identifier != self._old_sysid: + return member.name + except Exception: + pass + logger.error('Patroni on replica %s was not restarted in 10 seconds', member.name) + + def wait_replicas_restart(self, cluster): + members = [member for member in cluster.members if member.name in self.replica_connections] + logger.info('Waiting for restart of patroni on replicas %s', ', '.join(m.name for m in members)) + pool = ThreadPool(len(members)) + results = pool.map(self.wait_replica_restart, members) + pool.close() + pool.join() + logger.info(' %s successfully restarted', results) + return all(results) + + def reset_custom_statistics_target(self): + from patroni.postgresql.connection import get_connection_cursor + + logger.info('Resetting non-default statistics target before analyze') + self._statistics = defaultdict(lambda: defaultdict(dict)) + + conn_kwargs = self.postgresql.local_conn_kwargs + + for d in self.postgresql.query('SELECT datname FROM pg_catalog.pg_database WHERE datallowconn'): + conn_kwargs['database'] = d[0] + with get_connection_cursor(**conn_kwargs) as cur: + cur.execute('SELECT attrelid::regclass, quote_ident(attname), attstattarget ' + 'FROM pg_catalog.pg_attribute WHERE attnum > 0 AND NOT attisdropped AND attstattarget > 0') + for table, column, target in cur.fetchall(): + query = 'ALTER TABLE {0} ALTER COLUMN {1} SET STATISTICS -1'.format(table, column) + logger.info("Executing '%s' in the database=%s. Old value=%s", query, d[0], target) + cur.execute(query) + self._statistics[d[0]][table][column] = target + + def restore_custom_statistics_target(self): + from patroni.postgresql.connection import get_connection_cursor + + if not self._statistics: + return + + conn_kwargs = self.postgresql.local_conn_kwargs + + logger.info('Restoring default statistics targets after upgrade') + for db, val in self._statistics.items(): + conn_kwargs['database'] = db + with get_connection_cursor(**conn_kwargs) as cur: + for table, val in val.items(): + for column, target in val.items(): + query = 'ALTER TABLE {0} ALTER COLUMN {1} SET STATISTICS {2}'.format(table, column, target) + logger.info("Executing '%s' in the database=%s", query, db) + try: + cur.execute(query) + except Exception: + logger.error("Failed to execute '%s'", query) + + def reanalyze(self): + from patroni.postgresql.connection import get_connection_cursor + + if not self._statistics: + return + + conn_kwargs = self.postgresql.local_conn_kwargs + + for db, val in self._statistics.items(): + conn_kwargs['database'] = db + with get_connection_cursor(**conn_kwargs) as cur: + for table in val.keys(): + query = 'ANALYZE {0}'.format(table) + logger.info("Executing '%s' in the database=%s", query, db) + try: + cur.execute(query) + except Exception: + logger.error("Failed to execute '%s'", query) + + def analyze(self): + try: + self.reset_custom_statistics_target() + except Exception as e: + logger.error('Failed to reset custom statistics targets: %r', e) + self.postgresql.analyze(True) + try: + self.restore_custom_statistics_target() + except Exception as e: + logger.error('Failed to restore custom statistics targets: %r', e) + + def do_upgrade(self): + from patroni.utils import polling_loop + + if not self.upgrade_required: + logger.info('Current version=%s, desired version=%s. Upgrade is not required', + self.cluster_version, self.desired_version) + return True + + if not (self.postgresql.is_running() and self.postgresql.is_leader()): + return logger.error('PostgreSQL is not running or in recovery') + + cluster = self.dcs.get_cluster() + + if not self.sanity_checks(cluster): + return False + + self._old_sysid = self.postgresql.sysid # remember old sysid + + logger.info('Cluster %s is ready to be upgraded', self.postgresql.scope) + if not self.postgresql.prepare_new_pgdata(self.desired_version): + return logger.error('initdb failed') + + if not self.postgresql.pg_upgrade(check=True): + return logger.error('pg_upgrade --check failed, more details in the %s_upgrade', self.postgresql.data_dir) + + try: + self.postgresql.drop_possibly_incompatible_objects() + except Exception: + return logger.error('Failed to drop possibly incompatible objects') + + logging.info('Enabling maintenance mode') + if not self.toggle_pause(True): + return False + + logger.info('Doing a clean shutdown of the cluster before pg_upgrade') + downtime_start = time.time() + if not self.postgresql.stop(block_callbacks=True): + return logger.error('Failed to stop the cluster before pg_upgrade') + + if self.replica_connections: + checkpoint_lsn = int(self.postgresql.latest_checkpoint_location()) + logger.info('Latest checkpoint location: %s', checkpoint_lsn) + + logger.info('Starting rsyncd') + self.start_rsyncd() + + if not self.wait_for_replicas(checkpoint_lsn): + return False + + if not (self.rsyncd.pid and self.rsyncd.poll() is None): + return logger.error('Failed to start rsyncd') + + if not self.postgresql.pg_upgrade(): + return logger.error('Failed to upgrade cluster from %s to %s', self.cluster_version, self.desired_version) + + self.postgresql.switch_pgdata() + self.upgrade_complete = True + + logger.info('Updating configuration files') + envdir = update_configs(self.desired_version) + + member = cluster.get_member(self.postgresql.name) + if self.replica_connections: + primary_ip = member.conn_kwargs().get('host') + rsync_start = time.time() + try: + ret = self.rsync_replicas(primary_ip) + except Exception as e: + logger.error('rsync failed: %r', e) + ret = False + logger.info('Rsync took %s seconds', time.time() - rsync_start) + + self.stop_rsyncd() + time.sleep(2) # Give replicas a bit of time to switch PGDATA + + self.remove_initialize_key() + kill_patroni() + self.remove_initialize_key() + + time.sleep(1) + for _ in polling_loop(10): + if self.check_patroni_api(member): + break + else: + logger.error('Patroni REST API on primary is not accessible after 10 seconds') + + logger.info('Starting the primary postgres up') + for _ in polling_loop(10): + try: + result = self.request(member, 'post', 'restart', {}) + logger.info(' %s %s', result.status, result.data.decode('utf-8')) + if result.status < 300: + break + except Exception as e: + logger.error('POST /restart failed: %r', e) + else: + logger.error('Failed to start primary after upgrade') + + logger.info('Upgrade downtime: %s', time.time() - downtime_start) + + try: + self.postgresql.update_extensions() + except Exception as e: + logger.error('Failed to update extensions: %r', e) + + # start analyze early + analyze_thread = Thread(target=self.analyze) + analyze_thread.start() + + self.wait_replicas_restart(cluster) + + self.resume_cluster() + + analyze_thread.join() + + self.reanalyze() + + logger.info('Total upgrade time (with analyze): %s', time.time() - downtime_start) + self.postgresql.bootstrap.call_post_bootstrap(self.config['bootstrap']) + self.postgresql.cleanup_old_pgdata() + + if envdir: + self.start_backup(envdir) + + return ret + + def post_cleanup(self): + self.stop_rsyncd() + self.resume_cluster() + + if self.new_data_created: + try: + self.postgresql.cleanup_new_pgdata() + except Exception as e: + logger.error('Failed to remove new PGDATA %r', e) + + def try_upgrade(self, replica_count): + try: + self.replica_count = replica_count + return self.do_upgrade() + finally: + self.post_cleanup() + + def start_backup(self, envdir): + logger.info('Initiating a new backup...') + if not os.fork(): + subprocess.call(['nohup', 'envdir', envdir, '/scripts/postgres_backup.sh', self.postgresql.data_dir], + stdout=open(os.devnull, 'w'), stderr=subprocess.STDOUT) + + +# this function will be running in a clean environment, therefore we can't rely on DCS connection +def rsync_replica(config, desired_version, primary_ip, pid): + from pg_upgrade import PostgresqlUpgrade + from patroni.utils import polling_loop + + me = psutil.Process() + + # check that we are the child of postgres backend + if me.parent().pid != pid and me.parent().parent().pid != pid: + return 1 + + backend = psutil.Process(pid) + if 'postgres' not in backend.name(): + return 1 + + postgresql = PostgresqlUpgrade(config) + + if postgresql.get_cluster_version() == desired_version: + return 0 + + if os.fork(): + return 0 + + # Wait until the remote side will close the connection and backend process exits + for _ in polling_loop(10): + if not backend.is_running(): + break + else: + logger.warning('Backend did not exit after 10 seconds') + + sysid = postgresql.sysid # remember old sysid + + if not postgresql.stop(block_callbacks=True): + logger.error('Failed to stop the cluster before rsync') + return 1 + + postgresql.switch_pgdata() + + update_configs(desired_version) + + env = os.environ.copy() + env['RSYNC_PASSWORD'] = postgresql.config.replication['password'] + if subprocess.call(['rsync', '--archive', '--delete', '--hard-links', '--size-only', '--omit-dir-times', + '--no-inc-recursive', '--include=/data/***', '--include=/data_old/***', + '--exclude=/data/pg_xlog/*', '--exclude=/data_old/pg_xlog/*', + '--exclude=/data/pg_wal/*', '--exclude=/data_old/pg_wal/*', '--exclude=*', + 'rsync://{0}@{1}:{2}/pgroot'.format(postgresql.name, primary_ip, RSYNC_PORT), + os.path.dirname(postgresql.data_dir)], env=env) != 0: + logger.error('Failed to rsync from %s', primary_ip) + postgresql.switch_back_pgdata() + # XXX: rollback configs? + return 1 + + conn_kwargs = {k: v for k, v in postgresql.config.replication.items() if v is not None} + if 'username' in conn_kwargs: + conn_kwargs['user'] = conn_kwargs.pop('username') + + # If restart Patroni right now there is a chance that it will exit due to the sysid mismatch. + # Due to cleaned environment we can't always use DCS on replicas in this script, therefore + # the good indicator of initialize key being deleted/updated is running primary after the upgrade. + for _ in polling_loop(300): + try: + with postgresql.get_replication_connection_cursor(primary_ip, **conn_kwargs) as cur: + cur.execute('IDENTIFY_SYSTEM') + if cur.fetchone()[0] != sysid: + break + except Exception: + pass + + # If the cluster was unpaused earlier than we restarted Patroni, it might have created + # the recovery.conf file and tried (and failed) to start the cluster up using wrong binaries. + # In case of upgrade to 12+ presence of PGDATA/recovery.conf will not allow postgres to start. + # We remove the recovery.conf and restart Patroni in order to make sure it is using correct config. + postgresql.config.remove_recovery_conf() + kill_patroni() + postgresql.config.remove_recovery_conf() + + return postgresql.cleanup_old_pgdata() + + +def main(): + from patroni.config import Config + from spilo_commons import PATRONI_CONFIG_FILE + + config = Config(PATRONI_CONFIG_FILE) + + if len(sys.argv) == 4: + desired_version = sys.argv[1] + primary_ip = sys.argv[2] + pid = int(sys.argv[3]) + return rsync_replica(config, desired_version, primary_ip, pid) + elif len(sys.argv) == 2: + replica_count = int(sys.argv[1]) + upgrade = InplaceUpgrade(config) + return 0 if upgrade.try_upgrade(replica_count) else 1 + else: + return 2 + + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s inplace_upgrade %(levelname)s: %(message)s', level='INFO') + sys.exit(main()) diff --git a/postgres-appliance/major_upgrade/pg_upgrade.py b/postgres-appliance/major_upgrade/pg_upgrade.py new file mode 100644 index 000000000..b5bf17639 --- /dev/null +++ b/postgres-appliance/major_upgrade/pg_upgrade.py @@ -0,0 +1,232 @@ +import logging +import os +import shutil +import subprocess +import psutil + +from patroni.postgresql import Postgresql + +logger = logging.getLogger(__name__) + + +class _PostgresqlUpgrade(Postgresql): + + def adjust_shared_preload_libraries(self, version): + from spilo_commons import adjust_extensions + + shared_preload_libraries = self.config.get('parameters').get('shared_preload_libraries') + self._old_config_values['shared_preload_libraries'] = shared_preload_libraries + + if shared_preload_libraries: + self.config.get('parameters')['shared_preload_libraries'] =\ + adjust_extensions(shared_preload_libraries, version) + + def start_old_cluster(self, config, version): + self.set_bin_dir(version) + + version = float(version) + + config[config['method']]['command'] = 'true' + if version < 9.5: # 9.4 and older don't have recovery_target_action + action = config[config['method']].get('recovery_target_action') + config[config['method']]['pause_at_recovery_target'] = str(action == 'pause').lower() + + # make sure we don't archive wals from the old version + self._old_config_values = {'archive_mode': self.config.get('parameters').get('archive_mode')} + self.config.get('parameters')['archive_mode'] = 'off' + + # and don't load shared_preload_libraries which don't exist in the old version + self.adjust_shared_preload_libraries(version) + + return self.bootstrap.bootstrap(config) + + def get_cluster_version(self): + with open(self._version_file) as f: + return f.read().strip() + + def set_bin_dir(self, version): + from spilo_commons import get_bin_dir + + self._old_bin_dir = self._bin_dir + self._bin_dir = get_bin_dir(version) + + @property + def local_conn_kwargs(self): + conn_kwargs = self.config.local_connect_kwargs + conn_kwargs['options'] = '-c synchronous_commit=local -c statement_timeout=0 -c search_path=' + conn_kwargs.pop('connect_timeout', None) + return conn_kwargs + + def drop_possibly_incompatible_objects(self): + from patroni.postgresql.connection import get_connection_cursor + + logger.info('Dropping objects from the cluster which could be incompatible') + conn_kwargs = self.local_conn_kwargs + + for d in self.query('SELECT datname FROM pg_catalog.pg_database WHERE datallowconn'): + conn_kwargs['database'] = d[0] + with get_connection_cursor(**conn_kwargs) as cur: + logger.info('Executing "DROP FUNCTION metric_helpers.pg_stat_statements" in the database="%s"', d[0]) + cur.execute("DROP FUNCTION IF EXISTS metric_helpers.pg_stat_statements(boolean) CASCADE") + logger.info('Executing "DROP EXTENSION pg_stat_kcache"') + cur.execute("DROP EXTENSION IF EXISTS pg_stat_kcache") + logger.info('Executing "DROP EXTENSION pg_stat_statements"') + cur.execute("DROP EXTENSION IF EXISTS pg_stat_statements") + logger.info('Executing "DROP EXTENSION IF EXISTS amcheck_next" in the database="%s"', d[0]) + cur.execute("DROP EXTENSION IF EXISTS amcheck_next") + if d[0] == 'postgres': + logger.info('Executing "DROP TABLE postgres_log CASCADE" in the database=postgres') + cur.execute('DROP TABLE IF EXISTS public.postgres_log CASCADE') + cur.execute("SELECT oid::regclass FROM pg_catalog.pg_class WHERE relpersistence = 'u'") + for unlogged in cur.fetchall(): + logger.info('Truncating unlogged table %s', unlogged[0]) + try: + cur.execute('TRUNCATE {0}'.format(unlogged[0])) + except Exception as e: + logger.error('Failed: %r', e) + + def update_extensions(self): + from patroni.postgresql.connection import get_connection_cursor + + conn_kwargs = self.local_conn_kwargs + + for d in self.query('SELECT datname FROM pg_catalog.pg_database WHERE datallowconn'): + conn_kwargs['database'] = d[0] + with get_connection_cursor(**conn_kwargs) as cur: + cur.execute('SELECT quote_ident(extname) FROM pg_catalog.pg_extension') + for extname in cur.fetchall(): + query = 'ALTER EXTENSION {0} UPDATE'.format(extname[0]) + logger.info("Executing '%s' in the database=%s", query, d[0]) + try: + cur.execute(query) + except Exception as e: + logger.error('Failed: %r', e) + + @staticmethod + def remove_new_data(d): + if d.endswith('_new') and os.path.isdir(d): + shutil.rmtree(d) + + def cleanup_new_pgdata(self): + if getattr(self, '_new_data_dir', None): + self.remove_new_data(self._new_data_dir) + + def cleanup_old_pgdata(self): + if os.path.exists(self._old_data_dir): + logger.info('Removing %s', self._old_data_dir) + shutil.rmtree(self._old_data_dir) + return True + + def switch_pgdata(self): + self._old_data_dir = self._data_dir + '_old' + self.cleanup_old_pgdata() + os.rename(self._data_dir, self._old_data_dir) + if getattr(self, '_new_data_dir', None): + os.rename(self._new_data_dir, self._data_dir) + return True + + def switch_back_pgdata(self): + if os.path.exists(self._data_dir): + self._new_data_dir = self._data_dir + '_new' + self.cleanup_new_pgdata() + os.rename(self._data_dir, self._new_data_dir) + os.rename(self._old_data_dir, self._data_dir) + + def pg_upgrade(self, check=False): + upgrade_dir = self._data_dir + '_upgrade' + if os.path.exists(upgrade_dir) and os.path.isdir(upgrade_dir): + shutil.rmtree(upgrade_dir) + + os.makedirs(upgrade_dir) + + old_cwd = os.getcwd() + os.chdir(upgrade_dir) + + pg_upgrade_args = ['-k', '-j', str(psutil.cpu_count()), + '-b', self._old_bin_dir, '-B', self._bin_dir, + '-d', self._data_dir, '-D', self._new_data_dir, + '-O', "-c timescaledb.restoring='on'"] + if 'username' in self.config.superuser: + pg_upgrade_args += ['-U', self.config.superuser['username']] + + if check: + pg_upgrade_args += ['--check'] + + logger.info('Executing pg_upgrade%s', (' --check' if check else '')) + if subprocess.call([self.pgcommand('pg_upgrade')] + pg_upgrade_args) == 0: + os.chdir(old_cwd) + shutil.rmtree(upgrade_dir) + return True + + def prepare_new_pgdata(self, version): + from spilo_commons import append_extentions + + locale = self.query('SHOW lc_collate').fetchone()[0] + encoding = self.query('SHOW server_encoding').fetchone()[0] + initdb_config = [{'locale': locale}, {'encoding': encoding}] + if self.query("SELECT current_setting('data_checksums')::bool").fetchone()[0]: + initdb_config.append('data-checksums') + + logger.info('initdb config: %s', initdb_config) + + self._new_data_dir = os.path.abspath(self._data_dir) + self._old_data_dir = self._new_data_dir + '_old' + self._data_dir = self._new_data_dir + '_new' + self.remove_new_data(self._data_dir) + old_postgresql_conf = self.config._postgresql_conf + self.config._postgresql_conf = os.path.join(self._data_dir, 'postgresql.conf') + old_version_file = self._version_file + self._version_file = os.path.join(self._data_dir, 'PG_VERSION') + + self.set_bin_dir(version) + + # restore original values of archive_mode and shared_preload_libraries + if getattr(self, '_old_config_values', None): + for name, value in self._old_config_values.items(): + if value is None: + self.config.get('parameters').pop(name) + else: + self.config.get('parameters')[name] = value + + shared_preload_libraries = self.config.get('parameters').get('shared_preload_libraries') + if shared_preload_libraries: + self.config.get('parameters')['shared_preload_libraries'] =\ + append_extentions(shared_preload_libraries, float(version)) + + if not self.bootstrap._initdb(initdb_config): + return False + + # Copy old configs. XXX: some parameters might be incompatible! + for f in os.listdir(self._new_data_dir): + if f.startswith('postgresql.') or f.startswith('pg_hba.conf') or f == 'patroni.dynamic.json': + shutil.copy(os.path.join(self._new_data_dir, f), os.path.join(self._data_dir, f)) + + self.config.write_postgresql_conf() + self._new_data_dir, self._data_dir = self._data_dir, self._new_data_dir + self.config._postgresql_conf = old_postgresql_conf + self._version_file = old_version_file + self.configure_server_parameters() + return True + + def do_upgrade(self): + return self.pg_upgrade() and self.switch_pgdata() and self.cleanup_old_pgdata() + + def analyze(self, in_stages=False): + vacuumdb_args = ['--analyze-in-stages'] if in_stages else [] + logger.info('Rebuilding statistics (vacuumdb%s)', (' ' + vacuumdb_args[0] if in_stages else '')) + vacuumdb_args += ['-a', '-Z', '-j', str(psutil.cpu_count())] + if 'username' in self.config.superuser: + vacuumdb_args += ['-U', self.config.superuser['username']] + subprocess.call([self.pgcommand('vacuumdb')] + vacuumdb_args) + + +def PostgresqlUpgrade(config): + config['postgresql'].update({'callbacks': {}, 'pg_ctl_timeout': 3600*24*7}) + + # avoid unnecessary interactions with PGDATA and postgres + is_running = _PostgresqlUpgrade.is_running + _PostgresqlUpgrade.is_running = lambda s: False + try: + return _PostgresqlUpgrade(config['postgresql']) + finally: + _PostgresqlUpgrade.is_running = is_running diff --git a/postgres-appliance/scripts/configure_spilo.py b/postgres-appliance/scripts/configure_spilo.py index 7e8d6aef6..898637649 100755 --- a/postgres-appliance/scripts/configure_spilo.py +++ b/postgres-appliance/scripts/configure_spilo.py @@ -10,7 +10,6 @@ import socket import subprocess import sys -import pwd from copy import deepcopy from six.moves.urllib_parse import urlparse @@ -20,6 +19,9 @@ import pystache import requests +from spilo_commons import RW_DIR, PATRONI_CONFIG_FILE, append_extentions,\ + get_binary_version, get_bin_dir, is_valid_pg_version, write_file, write_patroni_config + PROVIDER_AWS = "aws" PROVIDER_GOOGLE = "google" @@ -30,18 +32,6 @@ KUBERNETES_DEFAULT_LABELS = '{"application": "spilo"}' MEMORY_LIMIT_IN_BYTES_PATH = '/sys/fs/cgroup/memory/memory.limit_in_bytes' PATRONI_DCS = ('zookeeper', 'exhibitor', 'consul', 'etcd3', 'etcd') - - -# (min_version, max_version, shared_preload_libraries, extwlist.extensions) -extensions = { - 'timescaledb': (9.6, 12, True, True), - 'pg_cron': (9.5, 13, True, False), - 'pg_stat_kcache': (9.4, 13, True, False), - 'pg_partman': (9.4, 13, False, True) -} -if os.environ.get('ENABLE_PG_MON') == 'true': - extensions['pg_mon'] = (11, 13, True, False) - AUTO_ENABLE_WALG_RESTORE = ('WAL_S3_BUCKET', 'WALE_S3_PREFIX', 'WALG_S3_PREFIX', 'WALG_AZ_PREFIX') @@ -67,6 +57,15 @@ def parse_args(): return args +def adjust_owner(placeholders, resource, uid=None, gid=None): + st = os.stat(placeholders['PGHOME']) + if uid is None: + uid = st.st_uid + if gid is None: + gid = st.st_gid + os.chown(resource, uid, gid) + + def link_runit_service(placeholders, name): service_dir = os.path.join(placeholders['RW_DIR'], 'service', name) if not os.path.exists(service_dir): @@ -110,9 +109,8 @@ def write_certificates(environment, overwrite): output, _ = p.communicate() logging.debug(output) - uid = pwd.getpwnam('postgres').pw_uid os.chmod(environment['SSL_PRIVATE_KEY_FILE'], 0o600) - os.chown(environment['SSL_PRIVATE_KEY_FILE'], uid, -1) + adjust_owner(environment, environment['SSL_PRIVATE_KEY_FILE'], gid=-1) def deep_update(a, b): @@ -478,7 +476,7 @@ def get_placeholders(provider): placeholders.setdefault('BGMON_LISTEN_IP', '0.0.0.0') placeholders.setdefault('PGPORT', '5432') placeholders.setdefault('SCOPE', 'dummy') - placeholders.setdefault('RW_DIR', '/run') + placeholders.setdefault('RW_DIR', RW_DIR) placeholders.setdefault('SSL_TEST_RELOAD', 'SSL_PRIVATE_KEY_FILE' in os.environ) placeholders.setdefault('SSL_CA_FILE', '') placeholders.setdefault('SSL_CRL_FILE', '') @@ -603,15 +601,6 @@ def get_placeholders(provider): return placeholders -def write_file(config, filename, overwrite): - if not overwrite and os.path.exists(filename): - logging.warning('File %s already exists, not overwriting. (Use option --force if necessary)', filename) - else: - with open(filename, 'w') as f: - logging.info('Writing to file %s', filename) - f.write(config) - - def pystache_render(*args, **kwargs): render = pystache.Renderer(missing_tags='strict') return render.render(*args, **kwargs) @@ -702,9 +691,9 @@ def write_wale_environment(placeholders, prefix, overwrite): 'WALG_SENTINEL_USER_DATA', 'WALG_PREVENT_WAL_OVERWRITE'] wale = defaultdict(lambda: '') - for name in ['WALE_ENV_DIR', 'SCOPE', 'WAL_BUCKET_SCOPE_PREFIX', 'WAL_BUCKET_SCOPE_SUFFIX', + for name in ['PGVERSION', 'WALE_ENV_DIR', 'SCOPE', 'WAL_BUCKET_SCOPE_PREFIX', 'WAL_BUCKET_SCOPE_SUFFIX', 'WAL_S3_BUCKET', 'WAL_GCS_BUCKET', 'WAL_GS_BUCKET', 'WAL_SWIFT_BUCKET', 'BACKUP_NUM_TO_RETAIN', - 'WALG_AZ_PREFIX'] + s3_names + swift_names + gs_names + walg_names + azure_names: + 'ENABLE_WAL_PATH_COMPAT'] + s3_names + swift_names + gs_names + walg_names + azure_names: wale[name] = placeholders.get(prefix + name, '') if wale.get('WAL_S3_BUCKET') or wale.get('WALE_S3_PREFIX') or wale.get('WALG_S3_PREFIX'): @@ -753,7 +742,7 @@ def write_wale_environment(placeholders, prefix, overwrite): prefix_env_name = write_envdir_names[0] store_type = prefix_env_name[5:].split('_')[0] if not wale.get(prefix_env_name): # WALE_*_PREFIX is not defined in the environment - bucket_path = '/spilo/{WAL_BUCKET_SCOPE_PREFIX}{SCOPE}{WAL_BUCKET_SCOPE_SUFFIX}/wal/'.format(**wale) + bucket_path = '/spilo/{WAL_BUCKET_SCOPE_PREFIX}{SCOPE}{WAL_BUCKET_SCOPE_SUFFIX}/wal/{PGVERSION}'.format(**wale) prefix_template = '{0}://{{WAL_{1}_BUCKET}}{2}'.format(store_type.lower(), store_type, bucket_path) wale[prefix_env_name] = prefix_template.format(**wale) # Set WALG_*_PREFIX for future compatibility @@ -766,7 +755,9 @@ def write_wale_environment(placeholders, prefix, overwrite): wale['WALE_LOG_DESTINATION'] = 'stderr' for name in write_envdir_names + ['WALE_LOG_DESTINATION'] + ([] if prefix else ['BACKUP_NUM_TO_RETAIN']): if wale.get(name): - write_file(wale[name], os.path.join(wale['WALE_ENV_DIR'], name), overwrite) + path = os.path.join(wale['WALE_ENV_DIR'], name) + write_file(wale[name], path, overwrite) + adjust_owner(placeholders, path, gid=-1) if not os.path.exists(placeholders['WALE_TMPDIR']): os.makedirs(placeholders['WALE_TMPDIR']) @@ -790,9 +781,8 @@ def write_clone_pgpass(placeholders, overwrite): 'password': escape_pgpass_value(placeholders['CLONE_PASSWORD'])} pgpass_string = "{host}:{port}:{database}:{user}:{password}".format(**r) write_file(pgpass_string, pgpassfile, overwrite) - uid = os.stat(placeholders['PGHOME']).st_uid os.chmod(pgpassfile, 0o600) - os.chown(pgpassfile, uid, -1) + adjust_owner(placeholders, pgpassfile, gid=-1) def check_crontab(user): @@ -888,18 +878,9 @@ def write_pgbouncer_configuration(placeholders, overwrite): link_runit_service(placeholders, 'pgbouncer') -def get_binary_version(bin_dir): - postgres = os.path.join(bin_dir or '', 'postgres') - version = subprocess.check_output([postgres, '--version']).decode() - version = re.match(r'^[^\s]+ [^\s]+ (\d+)(\.(\d+))?', version) - return '.'.join([version.group(1), version.group(3)]) if int(version.group(1)) < 10 else version.group(1) - - def update_bin_dir(placeholders, version): - bin_dir = '/usr/lib/postgresql/{0}/bin'.format(version) - postgres = os.path.join(bin_dir, 'postgres') - if os.path.isfile(postgres) and os.access(postgres, os.X_OK): # check that there is postgres binary inside - placeholders['postgresql']['bin_dir'] = bin_dir + if is_valid_pg_version(version): + placeholders['postgresql']['bin_dir'] = get_bin_dir(version) def main(): @@ -939,13 +920,14 @@ def main(): if not os.path.exists(version_file) or not config['postgresql'].get('bin_dir'): update_bin_dir(config, os.environ.get('PGVERSION', '')) - version = float(get_binary_version(config['postgresql'].get('bin_dir'))) + config['PGVERSION'] = get_binary_version(config['postgresql'].get('bin_dir')) + version = float(config['PGVERSION']) if 'shared_preload_libraries' not in user_config.get('postgresql', {}).get('parameters', {}): - libraries = [',' + n for n, v in extensions.items() if version >= v[0] and version <= v[1] and v[2]] - config['postgresql']['parameters']['shared_preload_libraries'] += ''.join(libraries) + config['postgresql']['parameters']['shared_preload_libraries'] =\ + append_extentions(config['postgresql']['parameters']['shared_preload_libraries'], version) if 'extwlist.extensions' not in user_config.get('postgresql', {}).get('parameters', {}): - extwlist = [',' + n for n, v in extensions.items() if version >= v[0] and version <= v[1] and v[3]] - config['postgresql']['parameters']['extwlist.extensions'] += ''.join(extwlist) + config['postgresql']['parameters']['extwlist.extensions'] =\ + append_extentions(config['postgresql']['parameters']['extwlist.extensions'], version, True) # Ensure replication is available if 'pg_hba' in config['bootstrap'] and not any(['replication' in i for i in config['bootstrap']['pg_hba']]): @@ -953,19 +935,17 @@ def main(): format(config['postgresql']['authentication']['replication']['username']) config['bootstrap']['pg_hba'].insert(0, rep_hba) - patroni_configfile = os.path.join(placeholders['PGHOME'], 'postgres.yml') - for section in args['sections']: logging.info('Configuring %s', section) if section == 'patroni': - write_file(yaml.dump(config, default_flow_style=False, width=120), patroni_configfile, args['force']) + write_patroni_config(config, args['force']) + adjust_owner(placeholders, PATRONI_CONFIG_FILE, gid=-1) link_runit_service(placeholders, 'patroni') pg_socket_dir = '/run/postgresql' if not os.path.exists(pg_socket_dir): os.makedirs(pg_socket_dir) - st = os.stat(placeholders['PGHOME']) - os.chown(pg_socket_dir, st.st_uid, st.st_gid) os.chmod(pg_socket_dir, 0o2775) + adjust_owner(placeholders, pg_socket_dir) # It is a recurring and very annoying problem with crashes (host/pod/container) # while the backup is taken in the exclusive mode which leaves the backup_label @@ -983,7 +963,6 @@ def main(): # We are not doing such trick in the Patroni (removing backup_label) because # we have absolutely no idea what software people use for backup/recovery. # In case of some home-grown solution they might end up in copying postmaster.pid... - pgdata = config['postgresql']['data_dir'] postmaster_pid = os.path.join(pgdata, 'postmaster.pid') backup_label = os.path.join(pgdata, 'backup_label') if os.path.isfile(postmaster_pid) and os.path.isfile(backup_label): diff --git a/postgres-appliance/scripts/restore_command.sh b/postgres-appliance/scripts/restore_command.sh index 92285df7b..80bcd3e7b 100755 --- a/postgres-appliance/scripts/restore_command.sh +++ b/postgres-appliance/scripts/restore_command.sh @@ -1,5 +1,25 @@ #!/bin/bash +if [[ "$ENABLE_WAL_PATH_COMPAT" = "true" ]]; then + unset ENABLE_WAL_PATH_COMPAT + bash "$(readlink -f "${BASH_SOURCE[0]}")" "$@" + exitcode=$? + [[ $exitcode = 0 ]] && exit 0 + for wale_env in $(printenv -0 | tr '\n' ' ' | sed 's/\x00/\n/g' | sed -n 's/^\(WAL[EG]_[^=][^=]*_PREFIX\)=.*$/\1/p'); do + suffix=$(basename "${!wale_env}") + if [[ -x "/usr/lib/postgresql/$suffix/bin/postgres" ]]; then + prefix=$(dirname "${!wale_env}") + if [[ $prefix =~ /spilo/ ]] && [[ $prefix =~ /wal$ ]]; then + printf -v "$wale_env" "%s" "$prefix" + # shellcheck disable=SC2163 + export "$wale_env" + changed_env=true + fi + fi + done + [[ "$changed_env" == "true" ]] || exit $exitcode +fi + readonly wal_filename=$1 readonly wal_destination=$2 diff --git a/postgres-appliance/scripts/spilo_commons.py b/postgres-appliance/scripts/spilo_commons.py new file mode 100644 index 000000000..be32b0f2d --- /dev/null +++ b/postgres-appliance/scripts/spilo_commons.py @@ -0,0 +1,85 @@ +import logging +import os +import subprocess +import re +import yaml + +logger = logging.getLogger('__name__') + +RW_DIR = os.environ.get('RW_DIR', '/run') +PATRONI_CONFIG_FILE = os.path.join(RW_DIR, 'postgres.yml') +LIB_DIR = '/usr/lib/postgresql' + +# (min_version, max_version, shared_preload_libraries, extwlist.extensions) +extensions = { + 'timescaledb': (9.6, 12, True, True), + 'pg_cron': (9.5, 13, True, False), + 'pg_stat_kcache': (9.4, 13, True, False), + 'pg_partman': (9.4, 13, False, True) +} +if os.environ.get('ENABLE_PG_MON') == 'true': + extensions['pg_mon'] = (11, 13, True, False) + + +def adjust_extensions(old, version, extwlist=False): + ret = [] + for name in old.split(','): + name = name.strip() + value = extensions.get(name) + if name not in ret and value is None or value[0] <= version <= value[1] and (not extwlist or value[3]): + ret.append(name) + return ','.join(ret) + + +def append_extentions(old, version, extwlist=False): + extwlist = 3 if extwlist else 2 + ret = [] + + def maybe_append(name): + value = extensions.get(name) + if name not in ret and value is None or value[0] <= version <= value[1] and value[extwlist]: + ret.append(name) + + for name in old.split(','): + maybe_append(name.strip()) + + for name in extensions.keys(): + maybe_append(name) + + return ','.join(ret) + + +def get_binary_version(bin_dir): + postgres = os.path.join(bin_dir or '', 'postgres') + version = subprocess.check_output([postgres, '--version']).decode() + version = re.match(r'^[^\s]+ [^\s]+ (\d+)(\.(\d+))?', version) + return '.'.join([version.group(1), version.group(3)]) if int(version.group(1)) < 10 else version.group(1) + + +def get_bin_dir(version): + return '{0}/{1}/bin'.format(LIB_DIR, version) + + +def is_valid_pg_version(version): + bin_dir = get_bin_dir(version) + postgres = os.path.join(bin_dir, 'postgres') + # check that there is postgres binary inside + return os.path.isfile(postgres) and os.access(postgres, os.X_OK) + + +def write_file(config, filename, overwrite): + if not overwrite and os.path.exists(filename): + logger.warning('File %s already exists, not overwriting. (Use option --force if necessary)', filename) + else: + with open(filename, 'w') as f: + logger.info('Writing to file %s', filename) + f.write(config) + + +def get_patroni_config(): + with open(PATRONI_CONFIG_FILE) as f: + return yaml.safe_load(f) + + +def write_patroni_config(config, force): + write_file(yaml.dump(config, default_flow_style=False, width=120), PATRONI_CONFIG_FILE, force) diff --git a/postgres-appliance/tests/docker-compose.yml b/postgres-appliance/tests/docker-compose.yml index 25e1db055..d104ed5e5 100644 --- a/postgres-appliance/tests/docker-compose.yml +++ b/postgres-appliance/tests/docker-compose.yml @@ -23,6 +23,7 @@ services: command: "sh -c 'exec etcd -name etcd1 -listen-client-urls http://0.0.0.0:2379 -advertise-client-urls http://$$(hostname --ip-address):2379'" spilo1: &spilo + depends_on: [ minio ] image: spilo networks: [ demo ] environment: @@ -44,7 +45,7 @@ services: postgresql: parameters: shared_buffers: 32MB - PGVERSION: '9.6' + PGVERSION: '9.5' # Just to test upgrade with clone. Without CLONE_SCOPE they don't work CLONE_WAL_S3_BUCKET: *bucket CLONE_AWS_ACCESS_KEY_ID: *access_key diff --git a/postgres-appliance/tests/schema.sql b/postgres-appliance/tests/schema.sql index 2806d437e..5a5fbae13 100644 --- a/postgres-appliance/tests/schema.sql +++ b/postgres-appliance/tests/schema.sql @@ -1,15 +1,8 @@ CREATE DATABASE test_db; \c test_db -CREATE EXTENSION timescaledb; - -CREATE TABLE "fOo" (id bigint NOT NULL PRIMARY KEY); -SELECT create_hypertable('"fOo"', 'id', chunk_time_interval => 100000); -INSERT INTO "fOo" SELECT * FROM generate_series(1, 3000000); -ALTER TABLE "fOo" ALTER COLUMN id SET STATISTICS 500; - CREATE UNLOGGED TABLE "bAr" ("bUz" INTEGER); ALTER TABLE "bAr" ALTER COLUMN "bUz" SET STATISTICS 500; -INSERT INTO "bAr" SELECT * FROM generate_series(1, 300000); +INSERT INTO "bAr" SELECT * FROM generate_series(1, 100000); CREATE TABLE with_oids() WITH OIDS; diff --git a/postgres-appliance/tests/schema2.sql b/postgres-appliance/tests/schema2.sql new file mode 100644 index 000000000..8b397772a --- /dev/null +++ b/postgres-appliance/tests/schema2.sql @@ -0,0 +1,8 @@ +\c test_db + +CREATE EXTENSION timescaledb; + +CREATE TABLE "fOo" (id bigint NOT NULL PRIMARY KEY); +SELECT create_hypertable('"fOo"', 'id', chunk_time_interval => 100000); +INSERT INTO "fOo" SELECT * FROM generate_series(1, 1000000); +ALTER TABLE "fOo" ALTER COLUMN id SET STATISTICS 500; diff --git a/postgres-appliance/tests/test_spilo.sh b/postgres-appliance/tests/test_spilo.sh index f3ff873cc..8b47ec44d 100644 --- a/postgres-appliance/tests/test_spilo.sh +++ b/postgres-appliance/tests/test_spilo.sh @@ -3,6 +3,7 @@ cd "$(dirname "${BASH_SOURCE[0]}")" || exit 1 readonly PREFIX="demo-" +readonly UPGRADE_SCRIPT="python3 /scripts/inplace_upgrade.py" readonly TIMEOUT=120 if [[ -t 2 ]]; then @@ -32,6 +33,16 @@ function stop_containers() { docker-compose rm -fs } +function get_non_leader() { + declare -r container=$1 + + if [[ "$container" == "${PREFIX}spilo1" ]]; then + echo "${PREFIX}spilo2" + else + echo "${PREFIX}spilo1" + fi +} + function docker_exec() { declare -r cmd=${*: -1:1} docker exec "${@:1:$(($#-1))}" su postgres -c "$cmd" @@ -111,30 +122,80 @@ function create_schema() { docker_exec -i "$1" "psql -U postgres" < schema.sql } +function create_schema2() { + docker_exec -i "$1" "psql -U postgres" < schema2.sql +} + function drop_table_with_oids() { docker_exec "$1" "psql -U postgres -d test_db -c 'DROP TABLE with_oids'" } +function drop_timescaledb() { + docker_exec "$1" "psql -U postgres -d test_db -c 'DROP EXTENSION timescaledb CASCADE'" +} + +function test_inplace_upgrade_wrong_container() { + ! docker_exec "$(get_non_leader "$1")" "PGVERSION=10 $UPGRADE_SCRIPT 4" +} + +function test_inplace_upgrade_wrong_version() { + docker_exec "$1" "PGVERSION=9.5 $UPGRADE_SCRIPT 3" 2>&1 | grep 'Upgrade is not required' +} + +function test_inplace_upgrade_wrong_capacity() { + docker_exec "$1" "PGVERSION=10 $UPGRADE_SCRIPT 4" 2>&1 | grep 'number of replicas does not match' +} + +function test_successful_inplace_upgrade_to_9_6() { + docker_exec "$1" "PGVERSION=9.6 $UPGRADE_SCRIPT 3" +} + +function test_envdir_suffix() { + docker_exec "$1" "cat /run/etc/wal-e.d/env/WALG_S3_PREFIX" | grep -q "$2$" \ + && docker_exec "$1" "cat /run/etc/wal-e.d/env/WALE_S3_PREFIX" | grep -q "$2$" +} + +function test_envdir_updated_to_x() { + for c in {1..3}; do + test_envdir_suffix "${PREFIX}spilo$c" "$1" || return 1 + done +} + +function test_failed_inplace_upgrade_big_replication_lag() { + ! test_successful_inplace_upgrade_to_9_6 "$1" +} + +function test_successful_inplace_upgrade_to_12() { + docker_exec "$1" "PGVERSION=12 $UPGRADE_SCRIPT 3" +} + +function test_pg_upgrade_to_12_check_failed() { + ! test_successful_inplace_upgrade_to_12 "$1" +} + +function test_successful_inplace_upgrade_to_13() { + docker_exec "$1" "PGVERSION=13 $UPGRADE_SCRIPT 3" +} + +function test_pg_upgrade_to_13_check_failed() { + ! test_successful_inplace_upgrade_to_13 "$1" +} + function start_clone_with_wale_upgrade_container() { + local ID=${1:-1} + docker-compose run \ -e SCOPE=upgrade \ -e PGVERSION=10 \ -e CLONE_SCOPE=demo \ -e CLONE_METHOD=CLONE_WITH_WALE \ -e CLONE_TARGET_TIME="$(date -d '1 minute' -u +'%F %T UTC')" \ - --name "${PREFIX}upgrade1" \ - -d spilo1 + --name "${PREFIX}upgrade$ID" \ + -d "spilo$ID" } function start_clone_with_wale_upgrade_replica_container() { - docker-compose run \ - -e SCOPE=upgrade \ - -e PGVERSION=10 \ - -e CLONE_SCOPE=demo \ - -e CLONE_METHOD=CLONE_WITH_WALE \ - -e CLONE_TARGET_TIME="$(date -d '1 minute' -u +'%F %T UTC')" \ - --name "${PREFIX}upgrade2" \ - -d spilo2 + start_clone_with_wale_upgrade_container 2 } function start_clone_with_basebackup_upgrade_container() { @@ -153,11 +214,11 @@ function start_clone_with_basebackup_upgrade_container() { } function verify_clone_with_wale_upgrade() { - wait_query "$1" "SELECT current_setting('server_version_num')::int/10000" 10 + wait_query "$1" "SELECT current_setting('server_version_num')::int/10000" 10 2> /dev/null } function verify_clone_with_basebackup_upgrade() { - wait_query "$1" "SELECT current_setting('server_version_num')::int/10000" 11 + wait_query "$1" "SELECT current_setting('server_version_num')::int/10000" 11 2> /dev/null } function run_test() { @@ -168,18 +229,61 @@ function run_test() { function test_spilo() { local container=$1 + run_test test_envdir_suffix "$container" 9.5 + + run_test test_inplace_upgrade_wrong_version "$container" + run_test test_inplace_upgrade_wrong_capacity "$container" + wait_all_streaming "$container" create_schema "$container" || exit 1 + # run_test test_failed_inplace_upgrade_big_replication_lag "$container" + wait_zero_lag "$container" wait_backup "$container" + log_info "Testing in-place major upgrade 9.5->9.6" + run_test test_successful_inplace_upgrade_to_9_6 "$container" + + wait_all_streaming "$container" + + run_test test_envdir_updated_to_x 9.6 + + run_test test_pg_upgrade_to_12_check_failed "$container" # pg_upgrade --check complains about OID + + create_schema2 "$container" || exit 1 + + wait_backup "$container" + wait_zero_lag "$container" + local upgrade_container upgrade_container=$(start_clone_with_wale_upgrade_container) - log_info "Started $upgrade_container for testing major upgrade after clone with wal-e" + log_info "Started $upgrade_container for testing major upgrade 9.6->10 after clone with wal-e" + + drop_table_with_oids "$container" + log_info "Testing in-place major upgrade 9.6->12" + run_test test_successful_inplace_upgrade_to_12 "$container" + + wait_all_streaming "$container" + + run_test test_envdir_updated_to_x 12 + + run_test test_pg_upgrade_to_13_check_failed "$container" # pg_upgrade --check complains about timescaledb + + wait_backup "$container" - log_info "Waiting for clone with wal-e and upgrade to complete..." + drop_timescaledb "$container" + log_info "Testing in-place major upgrade to 12->13" + run_test test_successful_inplace_upgrade_to_13 "$container" + + wait_all_streaming "$container" + + run_test test_envdir_updated_to_x 13 + + wait_backup "$container" + + log_info "Waiting for clone with wal-e and upgrade 9.6->10 to complete..." find_leader "$upgrade_container" > /dev/null docker logs "$upgrade_container" run_test verify_clone_with_wale_upgrade "$upgrade_container" @@ -192,12 +296,13 @@ function test_spilo() { local basebackup_container basebackup_container=$(start_clone_with_basebackup_upgrade_container "$upgrade_container") - log_info "Started $basebackup_container for testing major upgrade after clone with basebackup" + log_info "Started $basebackup_container for testing major upgrade 10->11 after clone with basebackup" + log_info "Waiting for postgres to start in the $upgrade_replica_container..." run_test verify_clone_with_wale_upgrade "$upgrade_replica_container" - log_info "Waiting for clone with basebackup and upgrade to complete..." + log_info "Waiting for clone with basebackup and upgrade 10->11 to complete..." find_leader "$basebackup_container" > /dev/null docker logs "$basebackup_container" run_test verify_clone_with_basebackup_upgrade "$basebackup_container"