#!/usr/bin/env perl

use warnings;
# vi: set ts=4 sw=4 :
#############################################################################
#
# Portions (C) 2003 Robert Kaye
#
# Formerly DBMirror.pl
# Contains the Database mirroring script.
# This script queries the dbmirror_pending table off the database specified
# (along with the associated schema) for updates that are pending on a
# specific host.  The database on that host is then updated with the changes.
#
#
#     Written by Steven Singer (ssinger@navtechinc.com)
#     (c) 2001-2002 Navtech Systems Support Inc.
# ALL RIGHTS RESERVED;
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose, without fee, and without a written agreement
# is hereby granted, provided that the above copyright notice and this
# paragraph and the following two paragraphs appear in all copies.
#
# IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
# LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
# DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
# AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
# ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
# PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
##############################################################################
# $Id$
#
##############################################################################

use strict;

use FindBin;
use lib "$FindBin::Bin/../../lib";

use Getopt::Long;
use DBI;
use DBDefs;
use MusicBrainz::Server::Context;
use Sql;

my $long_sql = 0;
my $short_sql = 0;
my $debug_xact = 0;
my $skip_seqid;
my $limit = 10000;

my $help = <<EOF;
Usage: ProcessReplicationChanges [OPTIONS]

Options are:
        --short-sql     Show a summary of each "write" statement executed
        --long-sql      Show the full text of each "write" statement executed
        --debug-xact    Show when each transaction starts / ends
    -s, --skip-seqid=N  Ignore SeqId's up to and including SeqId N
        --limit=N       Update, insert, or delete at most N rows at a time
                        This option is useful if you have a system with a small
                        amount of memory - a setting of 10,000 works well for
                        machines with 64MB RAM. The default is $limit.
    -h, --help          Show this help

EOF

GetOptions(
    "short-sql"         => \$short_sql,
    "long-sql"          => \$long_sql,
    "debug-xact"        => \$debug_xact,
    "skip-seqid|s=i"=> \$skip_seqid,
    "limit=i"           => \$limit,
    "help|h"            => sub { print $help; exit },
) or exit 2;
print($help), exit 2 if @ARGV;

$| = 1;
print localtime() . " : Processing replication changes\n";

$SIG{'INT'} = sub { die "SIGINT\n" };
my ($ins, $del, $upd) = (0, 0, 0);
my %bytable;

my $c = MusicBrainz::Server::Context->new(database => 'READWRITE');

my $sql = Sql->new($c->conn);
my $sql2 = Sql->new($c->conn);
my $slave = Sql->new($c->conn);

$sql->auto_commit;
$sql->do("SET search_path = musicbrainz");

$sql->auto_commit;
$sql->do("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
$sql->auto_commit;
$sql->do("SET CONSTRAINTS ALL DEFERRED");

#Obtain a list of pending transactions using ordering by our approximation
#to the commit time.  The commit time approximation is taken to be the
#SeqId of the last row edit in the transaction.
my $query = 'SELECT pd.XID,MAX(SeqId) FROM dbmirror_pending pd GROUP BY pd.XID ORDER BY MAX(pd.SeqId)';

my $total_statements = $sql->select_single_value(
    'SELECT COUNT(*) FROM dbmirror_pending',
);

# This loop loops through each pending transaction in the proper order.
# The Pending row edits for that transaction will be queried from the
# master and sent + committed to the slaves.
if (my $totalrows = $sql->select($query))
{
    my @row;

    use Time::HiRes qw( gettimeofday tv_interval );
    my $t1 = [gettimeofday];
    my $interval;
    my $rows = 0;
    my $strows = 0;

    printf "%9s %9s %4s %9s %9s\n", "XIDs", "Stmts", "est%", "XIDs/sec", "Stmt/sec";

    my $lasttime;
    my $p = sub {
        my ($pre, $post) = @_;
        no integer;
        printf $pre."%9d %9d %3d%% %9d %9d".$post,
                $rows, $strows,
                int(100 * $strows / $total_statements),
                $rows / ($interval||1),
                $strows / ($interval||1),
                ;
        $lasttime = time;
    };

    $p = sub { $lasttime = time }
        if $short_sql or $long_sql or $debug_xact;

        $p->("", "");

    while (@row = $sql->next_row)
    {
        my $XID = $row[0];
        my $maxSeqId = $row[1];
        my $seqId;

        if ($skip_seqid and $maxSeqId <= $skip_seqid)
        {
                print localtime() . " : Ignoring SeqId #$maxSeqId\n";
                next;
        }

        my $query = qq|
                SELECT  pnd.SeqId, pnd.TableName, pnd.Op, pnddata.IsKey, pnddata.Data AS Data
                FROM    dbmirror_pending pnd, dbmirror_pendingdata pnddata
                WHERE   pnd.SeqId = pnddata.SeqId
                AND             pnd.XID = ?
                ORDER BY SeqId, IsKey DESC
        |;

        $slave->begin;
        $sql2->do("DECLARE csr NO SCROLL CURSOR FOR $query", $XID);

        my @row2;
        my $pull = sub {
            $sql2->finish;
            $sql2->select("FETCH FORWARD $limit FROM csr");
        };
        while ($pull->())
        {
            while (@row2 = $sql2->next_row)
            {
                # TODO: Figure out how to handle errors here
                if (!mirrorCommand($row2[2], $sql2, $slave, \@row2, $XID, $pull))
                {
                    die "Mirror command failed.\n";
                }
                ++$strows;
            }
        }
        $sql2->finish;

        $sql2->do("CLOSE csr");
        $slave->do(qq[DELETE FROM dbmirror_pending WHERE XID = ?], $XID);

        print localtime() . " : COMMIT XID #$XID\n" if $debug_xact;
        $slave->commit;

        ++$rows;
        unless ($rows & 0x1F and time() == $lasttime)
        {
                $interval = tv_interval($t1);
                $p->("\r", "");
        }
    }

    $interval = tv_interval($t1);
    $p->("\r", sprintf(" %.2f sec\n", $interval));

}
    $sql->finish;

print localtime() . " : Summary of changes applied:\n";
print "Inserts  Updates  Deletes  Table\n";
for my $t (sort keys %bytable)
{
    printf "%7d  %7d  %7d  %s\n", @{ $bytable{$t} }, $t;
}
printf "%7d  %7d  %7d  %s\n", $ins, $upd, $del, "(total)";

print localtime() . " : Replication changes applied\n";

{
    for my $table (qw( dbmirror_pendingdata dbmirror_pending ))
    {
        print localtime() . " : Checking that $table is empty\n";

        my $n = $sql->select_single_value(
                "SELECT COUNT(*) FROM \"$table\"",
        );
        die "$table still contains data (rows=$n)!" if $n;

        print localtime() . " : Optimising $table\n";
        $sql->auto_commit;
        $sql->do("VACUUM ANALYZE \"$table\"");
    }
}

print localtime() . " : ProcessReplicationChanges complete\n";
exit;

sub mirrorCommand
{
    my ($op, $result, $slave, $row, $transId, $pull)  = @_;

    my $table = $row->[1];
    $table =~ s/^"public"\.//;
    $row->[1] = $table;
    my $t = ($bytable{$table} ||= [0,0,0]);

    if ($op eq 'i')
    {
        ++$ins;
        ++$t->[0];
        return mirrorInsert($result, $slave, $row, $transId);
    }
    elsif ($op eq 'd')
    {
        ++$del;
        ++$t->[2];
        return mirrorDelete($result, $slave, $row, $transId);
    }
    elsif ($op eq 'u')
    {
        ++$upd;
        ++$t->[1];
        return mirrorUpdate($result, $slave, $row, $transId, $pull);
    }
    return 0;
}

use MusicBrainz::Server::dbmirror;

sub mirrorInsert
{
    my ($result, $slave, $row, $transId)  = @_;
    my $seqId = $row->[0];
    my $tableName = $row->[1];

    my $valuepairs = MusicBrainz::Server::dbmirror::unpack_data($row->[4], $seqId)
        or die;

    my ($statement, $args) = MusicBrainz::Server::dbmirror::prepare_insert($tableName, $valuepairs);

    print localtime() . " : INSERT INTO $tableName\n" if $short_sql;
    show_long_sql($statement, $args) if $long_sql;
    $slave->do($statement, @$args);

    return 1;
}

sub mirrorDelete
{
    my ($result, $slave, $row, $transId) = @_;
    my $seqId = $row->[0];
    my $tableName = $row->[1];

    my $keypairs = MusicBrainz::Server::dbmirror::unpack_data($row->[4], $seqId)
        or die;

    my ($statement, $args) = MusicBrainz::Server::dbmirror::prepare_delete($tableName, $keypairs);

    print localtime() . " : DELETE FROM $tableName\n" if $short_sql;
    show_long_sql($statement, $args) if $long_sql;
    $slave->do($statement, @$args);

    return 1;
}

sub mirrorUpdate
{
    my ($result, $slave, $row, $transId, $pull) = @_;
    my $seqId = $row->[0];
    my $tableName = $row->[1];

    my $keypairs = MusicBrainz::Server::dbmirror::unpack_data($row->[4], $seqId)
        or die;

    my @row2 = $result->next_row;
    if (!@row2) {
        unless ($pull->() and @row2 = $result->next_row) {
            die 'Unable to find required data to perform UPDATE';
        }
    }
    my $valuepairs = MusicBrainz::Server::dbmirror::unpack_data($row2[4], $seqId);

    my ($statement, $args) = MusicBrainz::Server::dbmirror::prepare_update($tableName, $valuepairs, $keypairs);

    print localtime() . " : UPDATE $tableName\n" if $short_sql;
    show_long_sql($statement, $args) if $long_sql;
    $slave->do($statement, @$args);

    return 1;
}

sub show_long_sql
{
    my ($statement, $args) = @_;
    printf "%s : %s (%s)\n",
        scalar(localtime),
        $statement,
        join(" ", map { defined() ? $_ : "NULL" } @$args),
        ;
}

1;
# eof ProcessReplicationChanges
