#!/usr/bin/env perl

use warnings;
# vi: set ts=4 sw=4 :
#____________________________________________________________________________
#
#   MusicBrainz -- the open internet music database
#
#   Copyright (C) 1998 Robert Kaye
#
#   This program is free software; you can redistribute it and/or modify
#   it under the terms of the GNU General Public License as published by
#   the Free Software Foundation; either version 2 of the License, or
#   (at your option) any later version.
#
#   This program is distributed in the hope that it will be useful,
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#   GNU General Public License for more details.
#
#   You should have received a copy of the GNU General Public License
#   along with this program; if not, write to the Free Software
#   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
#
#   $Id$
#____________________________________________________________________________

use FindBin;
use lib "$FindBin::Bin/../../lib";

use strict;
use DBDefs;
use integer;

use Getopt::Long;
use Try::Tiny;
use LWP::UserAgent;
use HTTP::Status qw( RC_OK RC_NOT_FOUND RC_NOT_MODIFIED );
use GnuPG qw( :algo );

use MusicBrainz::Server::Replication qw( :replication_type );

DBDefs->REPLICATION_TYPE == RT_SLAVE
    or die "This is not a slave server!\n";

my $fHelp;
my $fProgress = -t STDOUT;
my $tmpdir = "/tmp";
my $fKeepFiles = 0;
my $limit = 0;
my $count = 0;
my $period = 'hourly';
my $catchup;
my $baseuri = "http://data.musicbrainz.org/pub/musicbrainz/data/replication";
my $lockfile = my $deflockfile = "/tmp/.mb-LoadReplicationChanges";
my @process_opts;
my $fVerifySig = 0;

GetOptions(
    "help"                      => \$fHelp,
    "lockfile=s"                => \$lockfile,
    "base-uri=s"                => \$baseuri,
    "process-arg=s"             => \@process_opts,
    "limit=i"                   => \$limit,
    "period=s"                  => \$period,
    "catchup"                   => \$catchup,
    "verify-sig"                => \$fVerifySig,
) or exit 2;

$baseuri =~ s/\/$//;

sub usage
{
    print <<EOF;
Usage: LoadReplicationChanges [options]

    --help             show this help
    --lockfile=FILE    use FILE as the lock file to prevent us running in
                       parallel (default: $deflockfile)
    --base-uri=URI     load the replication data from this location
                       (default: from data.musicbrainz.org via HTTP)
    --process-arg=ARG  Add ARG to each invocation of ProcessReplicationChanges.
                       If required, ARG can be an option (e.g.
                       --process-arg=--debug-xact).  Add --process-arg=ARG
                       again to specify additional arguments.
    --limit=N          apply only N packets
    --period=ARG       ARG can be e.g. 'daily' or 'weekly'; default 'hourly'.
                       This will only work if you're currently in sync with
                       these replication options.
    --catchup          For periods other than 'hourly', apply hourly packets
                       until in sync with available packets for the provided
                       period
    --verify-sig       If this option is present the replication packets will
                       be verified against their signature.

EOF
}

usage(), exit if $fHelp;
usage(), exit 2 if @ARGV;

$SIG{'INT'} = sub { die "SIGINT\n" };

# Get a lock so we don't run multiple instances in parallel
my $lockfh;
{
    use Fcntl qw( LOCK_EX LOCK_NB O_CREAT O_WRONLY );
    sysopen($lockfh, $lockfile, O_CREAT|O_WRONLY, 0600)
        or die "open >$lockfile: $!";
    flock($lockfh, LOCK_EX|LOCK_NB)
        and last;
    $!{EWOULDBLOCK}
        or die "flock $lockfile: $!";
    print localtime() . " : There is already an instance of LoadReplicationChanges running - aborting\n";
    exit;
}

use MusicBrainz::Server::Context;
my $c = MusicBrainz::Server::Context->create_script_context(database => 'READWRITE');

use Sql;
my $sql = Sql->new($c->conn);
my $dbh = $c->dbh;

# Retrieve current_schema_sequence and current_replication_sequence

my ($iSchemaSequence, $iReplicationSequence, $dtReplicationDate) = do {
    my $row = $sql->select_single_row_hash("SELECT * FROM replication_control");
    $row ||= {};
    @$row{qw(
        current_schema_sequence
        current_replication_sequence
        last_replication_date
    )};
};

unless ($iSchemaSequence == DBDefs->DB_SCHEMA_SEQUENCE)
{
    printf STDERR "%s : Schema sequence mismatch - codebase is %d, database is %d\n",
        scalar localtime,
        DBDefs->DB_SCHEMA_SEQUENCE,
        $iSchemaSequence,
        ;
    exit 1;
}

unless (defined $iReplicationSequence)
{
    print localtime() . " : This database does not correspond to any replication sequence"
        . " - you cannot update this database using replication\n"
        if not $iReplicationSequence;
    exit 1;
}

check_foreign_keys();

if ($sql->select_single_value('SELECT COUNT(*) FROM dbmirror_pending'))
{
    print localtime() . " : Continuing a previously aborted load\n";
    goto APPLY_CHANGES;
}

NEXT_PACKET:

# Download next replication packet (current_replication_sequence+1)

my $iNextReplicationSequence = $iReplicationSequence + 1;
my $file = "replication-$iNextReplicationSequence.tar.bz2";
my $uriprefix = '';
if ($period ne 'hourly') {
    $file = "replication-$period-$iNextReplicationSequence.tar.bz2";
    $uriprefix = "$period/";
}
my $url = "$baseuri/$uriprefix$file";
my $localfile = "$tmpdir/$file";

my $resp = retrieve_remote_file($url, $localfile);
# We only understand a limited set of responses:
# OK, not found, and the rest.
if ($catchup && ($resp->code == RC_OK or $resp->code == RC_NOT_MODIFIED)) {
    print localtime() . " : Caught up to packets for period '$period'. Please rerun now without --catchup.\n";
    exit 0;
}
else {
    if ($catchup) {
        print localtime() . " : Not yet caught up for period '$period'. Applying an hourly packet...\n";
        $file = "replication-$iNextReplicationSequence.tar.bz2";
        $uriprefix = '';
        $url = "$baseuri/$uriprefix$file";
        $localfile = "$tmpdir/$file";
        $resp = retrieve_remote_file($url, $localfile);
    }

    # We do not want to validate signature for non-existent files
    validate_file_signature($localfile) unless ($resp->code == RC_NOT_FOUND);
}

if ($resp->code == RC_NOT_FOUND)
{
    # TODO check for newer replication packets, in case the server has lost one?
    # die if so

    # Otherwise exit ok
    print localtime() . " : Replication packet #$iNextReplicationSequence not available\n";
    exit 0;
}

unless ($resp->code == RC_OK or $resp->code == RC_NOT_MODIFIED)
{
    print $resp->as_string;
    die;
}

# We successfully retrieved the replication packet.
# Decompress it to a temporary directory

use File::Temp qw( tempdir );
$SIG{'INT'} = sub { exit 3 };
my $mydir = tempdir("loadrep-XXXXXX", DIR => $tmpdir, CLEANUP => not $fKeepFiles);

print localtime() . " : Decompressing $localfile to $mydir\n";
system "/bin/tar",
    "-C", $mydir,
    "--bzip2",
    "-xvf",
    $localfile,
    ;
exit $? if $?;

unlink $localfile
    or warn "unlink $localfile: $!\n";

# Read SCHEMA_SEQUENCE.  Check it matches the current_schema_sequence
my $SCHEMA_SEQUENCE = read_file("SCHEMA_SEQUENCE");
unless (defined($SCHEMA_SEQUENCE) and $SCHEMA_SEQUENCE =~ /\A(\d+)\n\z/)
{
    print localtime() . " : SCHEMA_SEQUENCE file missing or malformed\n";
    exit 1;
}
$SCHEMA_SEQUENCE = $1;

unless ($SCHEMA_SEQUENCE == $iSchemaSequence)
{
    printf "%s : This replication packet matches schema sequence #%d"
        . ", but the database is currently at #%d\n",
        scalar localtime,
        $SCHEMA_SEQUENCE,
        $iSchemaSequence,
        ;
    print localtime() . " : You must upgrade your database in order to apply this replication packet\n";
    exit 1;
}

# Read REPLICATION_SEQUENCE.  Check that it matches the one we thought we'd
# downloaded.

my $REPLICATION_SEQUENCE = read_file("REPLICATION_SEQUENCE");
unless (defined($REPLICATION_SEQUENCE) and $REPLICATION_SEQUENCE =~ /\A(\d+)\n\z/)
{
    print localtime() . " : REPLICATION_SEQUENCE file missing or malformed\n";
    exit 1;
}
$REPLICATION_SEQUENCE = $1;

my $LAST_REPLICATION_SEQUENCE = read_file("LAST_REPLICATION_SEQUENCE");
unless (defined($LAST_REPLICATION_SEQUENCE) and $LAST_REPLICATION_SEQUENCE =~ /\A(\d+)\n\z/)
{
    print localtime() . " : LAST_REPLICATION_SEQUENCE file missing or malformed\n";
    $LAST_REPLICATION_SEQUENCE = $REPLICATION_SEQUENCE;
} else {
    $LAST_REPLICATION_SEQUENCE = $1;
}

unless ($REPLICATION_SEQUENCE == $iNextReplicationSequence)
{
    print localtime() . " : Oops!  Downloaded packet $file"
        . ", but instead of being sequence #$iNextReplicationSequence"
        . " it's actually #$REPLICATION_SEQUENCE!\n";
    print localtime() . " : Please tell support\@musicbrainz.org!\n";
    exit 1;
}

# Read and show TIMESTAMP

my $TIMESTAMP = read_file("TIMESTAMP");
unless (defined($TIMESTAMP) and $TIMESTAMP =~ /\A(.*)\n\z/)
{
    print localtime() . " : TIMESTAMP file missing or malformed, ignoring\n";
} else {
    chomp($TIMESTAMP = $1);
    print localtime() . " : This packet was produced (or begins) at $TIMESTAMP\n";
}

my $LAST_TIMESTAMP = read_file("LAST_TIMESTAMP");
unless (defined($LAST_TIMESTAMP) and $LAST_TIMESTAMP =~ /\A(.*)\n\z/)
{
    print localtime() . " : LAST_TIMESTAMP file missing or malformed, setting to TIMESTAMP\n";
    $LAST_TIMESTAMP = $TIMESTAMP;
} else {
    chomp($LAST_TIMESTAMP = $1);
    print localtime() . " : This packet covers changes until $LAST_TIMESTAMP\n";
}

# Check existence of the dbmirror_pending and dbmirror_pendingdata data files
for my $f (('dbmirror_pending', 'dbmirror_pendingdata'))
{
    next if -f "$mydir/mbdump/$f";
    print localtime() . " : There is no mbdump/$f file in this archive!\n";
    print localtime() . " : Please tell support\@musicbrainz.org!\n";
    exit 1;
}

# Load them
print localtime() . " : Importing dbmirror_pending and dbmirror_pendingdata files\n";
system "$FindBin::Bin/ImportReplicationChanges",
    "--tmp-dir", $tmpdir,
    "--",
    "$mydir/mbdump/dbmirror_pending",
    "$mydir/mbdump/dbmirror_pendingdata",
    ;
exit $? if $?;

# Wipe the temporary directory
print localtime() . " : Removing $mydir\n";
use File::Path qw( rmtree );
rmtree($mydir);

# If we had some data leftover in the tables, we jump straight in here:
APPLY_CHANGES:
# Apply the changes (ProcessReplicationChanges)
system "$FindBin::Bin/ProcessReplicationChanges", @process_opts;
exit $? if $?;

if (-x "$FindBin::Bin/hooks/post-process") {
    print localtime() . " : Running post-process hook\n";
    system "$FindBin::Bin/hooks/post-process";
    exit $? if $?;
}

# Check that current_replication_sequence has gone up by one
# Also last_replication_date should match TIMESTAMP
$LAST_REPLICATION_SEQUENCE = $iReplicationSequence + 1
    if not defined $LAST_REPLICATION_SEQUENCE;
{
    my $row = $sql->select_single_row_hash("SELECT * FROM replication_control");
    $row->{current_replication_sequence} == $LAST_REPLICATION_SEQUENCE
        or die "Applied changes, but current_replication_sequence is $row->{current_replication_sequence} not $LAST_REPLICATION_SEQUENCE\n";
    not defined($LAST_TIMESTAMP)
        or $row->{last_replication_date} eq $LAST_TIMESTAMP
        or warn "Applied changes, but last_replication_date is '$row->{last_replication_date}' not '$LAST_TIMESTAMP'\n";
    $iReplicationSequence = $row->{current_replication_sequence};
}
$dtReplicationDate = $LAST_TIMESTAMP;

$count += 1;
if ($limit > 0 && $count >= $limit) {
    print localtime() . " : Hit packet limit $limit.\n";
    exit 0;
}
# Loop back round and start again
goto NEXT_PACKET;

sub retrieve_remote_file {
    my ($url, $file) = @_;

    # Initialise User Agent
    my $ua = LWP::UserAgent->new(
        agent => '$Id$',
    );
    $ua->env_proxy;

    # Fetch the file and inform the user about what is being done
    print localtime() . " : Downloading $url to $file\n";
    my $f_resp = $ua->mirror($url, $file);

    return $f_resp unless $fVerifySig;

    # Identify signature URL and Local filename
    $url .= '.asc';
    my $signature = $file . '.asc';

    # Fetch file signature and inform the user about what is being done
    print localtime() . " : Downloading $url to $signature\n";
    my $s_resp = $ua->mirror($url, $signature);

    return $f_resp;
}

sub validate_file_signature {
    my ($file, $signature) = @_;

    # Only proceed if signature verification was requested
    return unless $fVerifySig;

    # Make sure that the file to be tested exists
    die "Can't find file $file: $!" unless (-e $file);

    # Fall back to the default signature if one is not supplied
    $signature = $file . ".asc" unless $signature;

    # Make sure that the signature exists
    unless (-e $signature) {
        # React based on the configured mode for missing signature
        (DBDefs->GPG_MISSING_SIGNATURE_MODE =~ m/FAIL/i)
            ? die "Can't find signature file $file: $!"
            : return;
    }

    # Initialise GnuPG module in local scope
    my $gpg = GnuPG->new;

    # Attempt to verify file signature
    try {
        $gpg->import_keys( keys => [ DBDefs->GPG_PUB_KEY ] );
        $gpg->verify( signature => $signature, file => $file );
    }

    # Exist with and error if verification failed
    catch {
        printf STDERR "Failed to verify signature for $file \n"
            . "GPG returned $_";
        die;
    };
}

sub read_file
{
    my $file = shift;
    open(my $fh, "<$mydir/$file")
        or return undef;
    local $/;
    <$fh>;
}

# Check that there are no foreign key contraints which link non-replicated
# tables to replicated ones.  The existence of such contraints would almost
# certainly cause the processing of replication data to fail.

sub check_foreign_keys
{
    my %non_rep = ();

    my $fks = $sql->select_list_of_hashes("
        SELECT  c.conname, a.relname AS fk_table, b.relname AS pk_table
        FROM    pg_constraint c, pg_class a, pg_class b
        WHERE   a.oid = c.conrelid
        AND             b.oid = c.confrelid
        AND             contype = 'f'
    ");

    my @bad = grep {
        $non_rep{ $_->{fk_table} } and not $non_rep{ $_->{pk_table} }
    } @$fks;

    for my $bad (@bad)
    {
        $bad->{fk_rows} = $sql->select_single_value(
                "SELECT COUNT(*) FROM $bad->{fk_table}",
        );
    }

    @bad = grep { $_->{fk_rows} } @bad;

    return unless @bad;

    print localtime() . " : Problem with foreign keys detected:\n";
    print <<EOF;

Your database includes foreign keys from non-replicated tables to replicated
ones, and the non-replicated tables are not empty.  This will almost certainly
cause replication processing to fail.  To load replication data, you'll need
to do one or both of the following:

EOF

    print "Remove the foreign keys:\n";
    printf "  ALTER TABLE %s DROP CONSTRAINT %s; -- refers to %s\n",
        $_->{fk_table}, $_->{conname}, $_->{pk_table},
        for @bad;
    print "\n";

    print "Empty the tables:\n";
    printf "  DELETE FROM %s;\n", $_->{fk_table} for @bad;
    print "\n";

    print localtime() . " : Replication abandoned\n";
    exit 1;
}

# eof LoadReplicationChanges
