/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={MediumTests.class})
public class TestReplicator
extends TestReplicationBase {
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicator.class);
    static final Logger LOG = LoggerFactory.getLogger(TestReplicator.class);
    static final int NUM_ROWS = 10;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        conf1.setInt("hbase.ipc.max.request.size", 10240);
        TestReplicationBase.setUpBeforeClass();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicatorBatching() throws Exception {
        this.truncateTable(utility1, tableName);
        this.truncateTable(utility2, tableName);
        admin.addPeer("testReplicatorBatching", new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()).setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
        ReplicationEndpointForTest.setBatchCount(0);
        ReplicationEndpointForTest.setEntriesCount(0);
        try {
            ReplicationEndpointForTest.pause();
            try {
                byte[] valueBytes = new byte[8192];
                for (int i = 0; i < 10; ++i) {
                    htable1.put(new Put(Bytes.toBytes((String)("row" + Integer.toString(i)))).addColumn(famName, null, valueBytes));
                }
            }
            finally {
                ReplicationEndpointForTest.resume();
            }
            Waiter.waitFor((Configuration)conf1, (long)60000L, (Waiter.Predicate)new Waiter.ExplainingPredicate<Exception>(){

                public boolean evaluate() throws Exception {
                    LOG.info("Count=" + ReplicationEndpointForTest.getBatchCount());
                    return ReplicationEndpointForTest.getBatchCount() >= 10;
                }

                public String explainFailure() throws Exception {
                    return "We waited too long for expected replication of 10 entries";
                }
            });
            Assert.assertEquals((String)"We sent an incorrect number of batches", (long)10L, (long)ReplicationEndpointForTest.getBatchCount());
            Assert.assertEquals((String)"We did not replicate enough rows", (long)10L, (long)utility2.countRows(htable2));
        }
        finally {
            admin.removePeer("testReplicatorBatching");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicatorWithErrors() throws Exception {
        this.truncateTable(utility1, tableName);
        this.truncateTable(utility2, tableName);
        admin.addPeer("testReplicatorWithErrors", new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()).setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()), null);
        FailureInjectingReplicationEndpointForTest.setBatchCount(0);
        FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
        try {
            FailureInjectingReplicationEndpointForTest.pause();
            try {
                byte[] valueBytes = new byte[8192];
                for (int i = 0; i < 10; ++i) {
                    htable1.put(new Put(Bytes.toBytes((String)("row" + Integer.toString(i)))).addColumn(famName, null, valueBytes));
                }
            }
            finally {
                FailureInjectingReplicationEndpointForTest.resume();
            }
            Waiter.waitFor((Configuration)conf1, (long)60000L, (Waiter.Predicate)new Waiter.ExplainingPredicate<Exception>(){

                public boolean evaluate() throws Exception {
                    return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= 10;
                }

                public String explainFailure() throws Exception {
                    return "We waited too long for expected replication of 10 entries";
                }
            });
            Assert.assertEquals((String)"We did not replicate enough rows", (long)10L, (long)utility2.countRows(htable2));
        }
        finally {
            admin.removePeer("testReplicatorWithErrors");
        }
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TestReplicationBase.tearDownAfterClass();
    }

    private void truncateTable(HBaseTestingUtility util, TableName tablename) throws IOException {
        HBaseAdmin admin = util.getHBaseAdmin();
        admin.disableTable(tableName);
        admin.truncateTable(tablename, false);
    }

    public static class FailureInjectingReplicationEndpointForTest
    extends ReplicationEndpointForTest {
        private final AtomicBoolean failNext = new AtomicBoolean(false);

        @Override
        protected Callable<Integer> createReplicator(List<WAL.Entry> entries, int ordinal) {
            return () -> {
                if (this.failNext.compareAndSet(false, true)) {
                    int batchIndex = this.replicateEntries(entries, ordinal);
                    entriesCount += entries.size();
                    int count = batchCount.incrementAndGet();
                    LOG.info("Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
                    return batchIndex;
                }
                if (this.failNext.compareAndSet(true, false)) {
                    throw new ServiceException("Injected failure");
                }
                return ordinal;
            };
        }
    }

    public static class ReplicationEndpointForTest
    extends HBaseInterClusterReplicationEndpoint {
        protected static AtomicInteger batchCount = new AtomicInteger(0);
        protected static int entriesCount;
        private static final Object latch;
        private static AtomicBoolean useLatch;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static void resume() {
            useLatch.set(false);
            Object object = latch;
            synchronized (object) {
                latch.notifyAll();
            }
        }

        public static void pause() {
            useLatch.set(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static void await() throws InterruptedException {
            if (useLatch.get()) {
                LOG.info("Waiting on latch");
                Object object = latch;
                synchronized (object) {
                    latch.wait();
                }
                LOG.info("Waited on latch, now proceeding");
            }
        }

        public static int getBatchCount() {
            return batchCount.get();
        }

        public static void setBatchCount(int i) {
            LOG.info("SetBatchCount=" + i + ", old=" + ReplicationEndpointForTest.getBatchCount());
            batchCount.set(i);
        }

        public static int getEntriesCount() {
            return entriesCount;
        }

        public static void setEntriesCount(int i) {
            LOG.info("SetEntriesCount=" + i);
            entriesCount = i;
        }

        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            try {
                ReplicationEndpointForTest.await();
            }
            catch (InterruptedException e) {
                LOG.warn("Interrupted waiting for latch", (Throwable)e);
            }
            return super.replicate(replicateContext);
        }

        protected Callable<Integer> createReplicator(List<WAL.Entry> entries, int ordinal) {
            return () -> {
                int batchIndex = this.replicateEntries(entries, ordinal);
                entriesCount += entries.size();
                int count = batchCount.incrementAndGet();
                LOG.info("Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
                return batchIndex;
            };
        }

        static {
            latch = new Object();
            useLatch = new AtomicBoolean(false);
        }
    }
}

