package org.openorb.notify.queue;

import org.apache.avalon.framework.logger.Logger;
import org.omg.CORBA.OBJECT_NOT_EXIST;
import org.omg.CORBA.ORB;
import org.omg.CORBA.SystemException;
import org.omg.CosEventComm.Disconnected;
import org.openorb.notify.ConsumerAdapter;
import org.openorb.notify.ConsumerProxyManagement;
import org.openorb.notify.NotifyThread;
import org.openorb.notify.PropertiesRepository;
import org.openorb.notify.persistence.Event;

/* loaded from: input_file:org/openorb/notify/queue/Pusher.class */
public class Pusher extends NotifyThread {
    private static final int FOREVER = -1;
    private static final int DEFAULT_REDELIVERY_PERIOD = 1000;
    private static final int DEFAULT_DISCONNECT_ON_FAILURE_TIMEOUT = 30;
    private boolean m_isDeliveryPossible;
    private long m_lastDeliveryAttemptTime;
    protected final ORB m_orb;
    protected final EventQueue m_queue;
    private boolean m_isConnectionActive;
    protected final ConsumerProxyManagement m_proxy;
    private PropertiesRepository m_propertiesRepository;
    private int m_maxRedeliveryAttempts;
    private int m_retryMillis;
    private long m_lastDeliverySuccessTime;
    private long m_firstDeliveryFailureTime;
    private boolean m_hasBeenActivatedFlag;
    private ConsumerAdapter m_consumer;
    private long m_disconnectOnFailureTimeout;

    public Pusher(String str, EventQueue eventQueue, ConsumerProxyManagement consumerProxyManagement, ORB orb, Logger logger) {
        super(str, logger.getChildLogger("thread"));
        this.m_isDeliveryPossible = true;
        this.m_lastDeliveryAttemptTime = 0L;
        this.m_maxRedeliveryAttempts = -1;
        this.m_retryMillis = 1000;
        this.m_lastDeliverySuccessTime = 0L;
        this.m_firstDeliveryFailureTime = 0L;
        this.m_hasBeenActivatedFlag = false;
        this.m_consumer = null;
        this.m_disconnectOnFailureTimeout = 30000L;
        this.m_queue = eventQueue;
        this.m_proxy = consumerProxyManagement;
        this.m_orb = orb;
        if (this.m_orb instanceof org.openorb.orb.core.ORB) {
            this.m_disconnectOnFailureTimeout = 1000 * ((org.openorb.orb.core.ORB) this.m_orb).getLoader().getIntProperty("notify.resourcesTimeout", 30);
        }
    }

    public void setConnectionActive(boolean z) {
        boolean z2;
        synchronized (this.m_stateLock) {
            z2 = this.m_isConnectionActive;
            this.m_isConnectionActive = z;
        }
        if (!z || z2) {
            return;
        }
        this.m_hasBeenActivatedFlag = true;
        this.m_firstDeliveryFailureTime = 0L;
        if (!isRunning() || isEventQueueEmpty()) {
            return;
        }
        notifyThread();
    }

    private boolean isConnectionActive() {
        boolean z;
        synchronized (this.m_stateLock) {
            z = this.m_isConnectionActive;
        }
        return z;
    }

    private boolean hasConsumer() {
        boolean z;
        synchronized (this.m_stateLock) {
            z = this.m_consumer != null;
        }
        return z;
    }

    public void pushEvent(Object obj) {
        boolean pushEvent;
        synchronized (this.m_queue) {
            pushEvent = this.m_queue.pushEvent(obj);
        }
        if (!pushEvent) {
            this.m_proxy.reportEventDelivery(obj.hashCode());
            return;
        }
        if (isConnectionActive()) {
            if (!isRunning()) {
                doDelivery();
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            if (this.m_lastDeliverySuccessTime > 0 && currentTimeMillis - this.m_lastDeliverySuccessTime > this.m_disconnectOnFailureTimeout * 2 && getEventQueueSize() > 10) {
                getLogger().warn(new StringBuffer().append("Pusher has not processed queue for ").append(currentTimeMillis - this.m_lastDeliverySuccessTime).append("ms, disconnecting consumer ").append(this.m_proxy.getId()).toString());
                this.m_proxy.reportClientDisconnection();
                setRunning(false);
            }
            notifyThread();
        }
    }

    protected boolean doDelivery() {
        Event waitForEvent;
        if (!isConnectionActive()) {
            if (this.m_disconnectOnFailureTimeout <= 0 || this.m_hasBeenActivatedFlag) {
                return false;
            }
            if (this.m_firstDeliveryFailureTime <= 0) {
                this.m_firstDeliveryFailureTime = System.currentTimeMillis();
                return false;
            }
            if (System.currentTimeMillis() - this.m_firstDeliveryFailureTime <= this.m_disconnectOnFailureTimeout || this.m_hasBeenActivatedFlag) {
                return false;
            }
            getLogger().error(new StringBuffer().append("connection not activated before timeout, disconnecting consumer ").append(this.m_proxy.getId()).toString());
            this.m_proxy.reportClientDisconnection();
            setRunning(false);
            return true;
        }
        if (this.m_isDeliveryPossible) {
            waitForEvent = waitForEvent();
        } else {
            long currentTimeMillis = System.currentTimeMillis();
            if (this.m_retryMillis <= (this.m_lastDeliveryAttemptTime < currentTimeMillis ? currentTimeMillis - this.m_lastDeliveryAttemptTime : (Long.MAX_VALUE - this.m_lastDeliveryAttemptTime) + currentTimeMillis)) {
                return false;
            }
            waitForEvent = waitForEvent();
        }
        if (waitForEvent == null) {
            return true;
        }
        if (!hasConsumer()) {
            this.m_proxy.reportEventDelivery(waitForEvent.hashCode());
            return true;
        }
        try {
            dispatchEvent(waitForEvent);
            this.m_lastDeliverySuccessTime = System.currentTimeMillis();
            this.m_isDeliveryPossible = true;
            this.m_firstDeliveryFailureTime = 0L;
            getLogger().debug("event successfully delivered to push consumer");
            return true;
        } catch (OBJECT_NOT_EXIST e) {
            getLogger().warn(new StringBuffer().append("push consumer does not exist, disconnecting ").append(this.m_proxy).toString(), e);
            this.m_proxy.reportEventDelivery(waitForEvent.hashCode());
            this.m_proxy.reportClientDisconnection();
            setRunning(false);
            return true;
        } catch (SystemException e2) {
            String id = this.m_proxy.getId();
            String stringBuffer = new StringBuffer().append("push consumer ").append(id).append(" temporarily unavailable during delivery of event ").append(waitForEvent.hashCode()).toString();
            if (this.m_firstDeliveryFailureTime == 0) {
                getLogger().warn(stringBuffer, e2);
            } else {
                getLogger().warn(new StringBuffer().append(stringBuffer).append(" since ").append(System.currentTimeMillis() - this.m_firstDeliveryFailureTime).append(" ms: ").append(e2).toString());
            }
            if (this.m_disconnectOnFailureTimeout > 0) {
                if (this.m_firstDeliveryFailureTime <= 0) {
                    this.m_firstDeliveryFailureTime = System.currentTimeMillis();
                } else if (disconnectProxyOnFailureTimeout(id, waitForEvent)) {
                    return true;
                }
            }
            PropertiesRepository propertiesRepository = getPropertiesRepository();
            if (propertiesRepository == null) {
                return true;
            }
            if (!propertiesRepository.isConnectionReliable() || !propertiesRepository.isEventReliable()) {
                getLogger().debug("QoS set to best effort delivery - dropping event");
                this.m_proxy.reportEventDelivery(waitForEvent.hashCode());
                return true;
            }
            if (!isRunning()) {
                this.m_isDeliveryPossible = false;
                this.m_lastDeliveryAttemptTime = System.currentTimeMillis();
                getLogger().debug("pushing event back to queue");
                this.m_queue.pushEvent(waitForEvent);
                return true;
            }
            int i = 0;
            while (true) {
                if (this.m_maxRedeliveryAttempts != -1 && i >= this.m_maxRedeliveryAttempts) {
                    if (!getLogger().isDebugEnabled()) {
                        return true;
                    }
                    getLogger().warn(new StringBuffer().append("push consumer ").append(this.m_proxy.getId()).append(" not available after ").append(this.m_maxRedeliveryAttempts).append(" retries - giving up and dropping event").toString());
                    this.m_proxy.reportEventDelivery(waitForEvent.hashCode());
                    return true;
                }
                try {
                    Thread.sleep(this.m_retryMillis);
                    if (getLogger().isDebugEnabled()) {
                        getLogger().debug(new StringBuffer().append("retrying event delivery to push consumer ").append(id).toString());
                    }
                    dispatchEvent(waitForEvent);
                    this.m_lastDeliverySuccessTime = System.currentTimeMillis();
                    this.m_firstDeliveryFailureTime = 0L;
                    if (!getLogger().isDebugEnabled()) {
                        return true;
                    }
                    getLogger().debug(new StringBuffer().append("event delivery to push consumer ").append(id).append(" finally successful").toString());
                    return true;
                } catch (InterruptedException e3) {
                    i++;
                } catch (OBJECT_NOT_EXIST e4) {
                    if (getLogger().isDebugEnabled()) {
                        getLogger().debug(new StringBuffer().append("push consumer ").append(id).append(" does not exist, disconnecting it").toString(), e4);
                    }
                    this.m_proxy.reportEventDelivery(waitForEvent.hashCode());
                    this.m_proxy.reportClientDisconnection();
                    setRunning(false);
                    return true;
                } catch (SystemException e5) {
                    getLogger().debug("push consumer still unavailable", e5);
                    if (this.m_firstDeliveryFailureTime <= 0) {
                        this.m_firstDeliveryFailureTime = System.currentTimeMillis();
                    } else if (disconnectProxyOnFailureTimeout(id, waitForEvent)) {
                        return true;
                    }
                    i++;
                }
            }
        }
    }

    private boolean disconnectProxyOnFailureTimeout(String str, Event event) {
        long currentTimeMillis = System.currentTimeMillis() - this.m_firstDeliveryFailureTime;
        if (currentTimeMillis <= this.m_disconnectOnFailureTimeout) {
            return false;
        }
        getLogger().warn(new StringBuffer().append("event delivery has failed for ").append(currentTimeMillis).append(" milliseconds, disconnecting consumer ").append(str).toString());
        this.m_proxy.reportEventDelivery(event.hashCode());
        this.m_proxy.reportClientDisconnection();
        setRunning(false);
        return true;
    }

    public void setConsumer(ConsumerAdapter consumerAdapter) {
        synchronized (this.m_stateLock) {
            this.m_consumer = consumerAdapter;
        }
    }

    private void dispatchEvent(Event event) {
        int hashCode = event.hashCode();
        try {
            this.m_consumer.push(event);
            this.m_proxy.reportEventDelivery(hashCode);
        } catch (Disconnected e) {
            this.m_proxy.reportEventDelivery(hashCode);
            try {
                this.m_consumer.disconnect();
            } catch (SystemException e2) {
                getLogger().error("disconnect push consumer failed", e2);
            } finally {
                this.m_proxy.reportClientDisconnection();
                setRunning(false);
            }
        }
    }

    @Override // org.openorb.notify.NotifyThread, java.lang.Runnable
    public void run() {
        while (!finishRunning()) {
            if (!doDelivery()) {
                try {
                    synchronized (this.m_queue) {
                        this.m_queue.wait(this.m_disconnectOnFailureTimeout);
                    }
                } catch (InterruptedException e) {
                }
            }
        }
        setConsumer(null);
        clearEventQueue();
    }

    private void clearEventQueue() {
        Event event;
        boolean isEmpty;
        if (isEventQueueEmpty()) {
            return;
        }
        if (getLogger().isDebugEnabled()) {
            getLogger().debug("clearing event queue");
        }
        do {
            synchronized (this.m_queue) {
                event = (Event) this.m_queue.pullEvent();
                isEmpty = this.m_queue.isEmpty();
            }
            this.m_proxy.reportEventDelivery(event.hashCode());
        } while (!isEmpty);
    }

    private void notifyThread() {
        synchronized (this.m_queue) {
            this.m_queue.notifyAll();
        }
    }

    private Event waitForEvent() {
        Event event;
        synchronized (this.m_queue) {
            if (this.m_queue.isEmpty() && isRunning()) {
                try {
                    this.m_queue.wait(this.m_disconnectOnFailureTimeout);
                } catch (InterruptedException e) {
                }
            }
            event = this.m_queue.isEmpty() ? null : (Event) this.m_queue.pullEvent();
        }
        return event;
    }

    private boolean isEventQueueEmpty() {
        boolean isEmpty;
        synchronized (this.m_queue) {
            isEmpty = this.m_queue.isEmpty();
        }
        return isEmpty;
    }

    private int getEventQueueSize() {
        int queueSize;
        synchronized (this.m_queue) {
            queueSize = this.m_queue.getQueueSize();
        }
        return queueSize;
    }

    private boolean finishRunning() {
        return (shouldFinishWork() && isEventQueueEmpty()) || !isRunning();
    }

    @Override // org.openorb.notify.NotifyThread, org.openorb.notify.ThreadManagement
    public void stopThread() {
        super.stopThread();
        notifyThread();
    }

    @Override // org.openorb.notify.NotifyThread, org.openorb.notify.ThreadManagement
    public void finishWorkAndStopThread() {
        if (!isRunning()) {
            clearEventQueue();
        }
        super.finishWorkAndStopThread();
        notifyThread();
    }

    public void setPropertiesRepository(PropertiesRepository propertiesRepository) {
        synchronized (this.m_stateLock) {
            this.m_propertiesRepository = propertiesRepository;
        }
    }

    private PropertiesRepository getPropertiesRepository() {
        PropertiesRepository propertiesRepository;
        synchronized (this.m_stateLock) {
            propertiesRepository = this.m_propertiesRepository;
        }
        return propertiesRepository;
    }
}
