001    /* 
002     * Copyright 2008-2009 the original author or authors.
003     * The contents of this file are subject to the Mozilla Public License
004     * Version 1.1 (the "License"); you may not use this file except in
005     * compliance with the License. You may obtain a copy of the License at
006     * http://www.mozilla.org/MPL/
007     *
008     * Software distributed under the License is distributed on an "AS IS"
009     * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
010     * License for the specific language governing rights and limitations
011     * under the License.
012     */
013     
014    package com.mtgi.analytics;
015    
016    import java.util.LinkedList;
017    
018    import org.apache.commons.logging.Log;
019    import org.apache.commons.logging.LogFactory;
020    import org.springframework.beans.factory.BeanNameAware;
021    import org.springframework.beans.factory.InitializingBean;
022    import org.springframework.beans.factory.annotation.Required;
023    import org.springframework.core.task.TaskExecutor;
024    import org.springframework.jmx.export.annotation.ManagedAttribute;
025    import org.springframework.jmx.export.annotation.ManagedOperation;
026    import org.springframework.jmx.export.annotation.ManagedResource;
027    
028    import com.mtgi.analytics.servlet.SpringSessionContext;
029    
030    /**
031     * <p>Standard implementation of {@link BehaviorTrackingManager}.  BehaviorEvent
032     * instances are asynchronously committed to a BehaviorEventPersister when they are
033     * complete; user and session IDs for the events are provided by an implementation
034     * of {@link SessionContext}.</p>
035     * 
036     * <p>Event persistence occurs when {@link #flush()} is called (either via JMX control
037     * or by the Quartz scheduler), or when the queue of uncommitted events exceeds
038     * the configured threshold value.  The flush threshold can be configured with
039     * {@link #setFlushThreshold(int)}.</p>
040     */
041    @ManagedResource(objectName="com.mtgi:group=analytics,name=BehaviorTracking", 
042                                     description="Monitor and control user behavior tracking")
043    public class BehaviorTrackingManagerImpl implements BehaviorTrackingManager, InitializingBean, BeanNameAware {
044    
045            /*
046             * Implementation is complicated by many issues, the first of which is performance.
047             * Interface method calls must return as quickly as possible to prevent measurement
048             * from interfering with application responsiveness.  Our only really expensive
049             * operation is event persistence flush, so we do this asynchronously on a TaskExecutor.
050             * We also log an event for each flush, so that if behavior tracking does start
051             * to consume significant resources, we can find evidence of that in the performance
052             * database and tune accordingly.
053             * 
054             * Another issue is memory.  We anticipate periodic flushing of completed events,
055             * but if activity temporarily exceeds anticipated levels, we want to avoid an excessive
056             * backlog of completed events waiting in memory.  So, we add a configurable flush
057             * threshold.  When more than the threshold number of events are finished, we automatically
058             * flush.  We also allow child events to be persisted before their parents complete, to avoid
059             * accumulating too many events in memory for long-running batch processes.
060             * 
061             * We also must worry about thread-safety.  Fortunately this one is
062             * fairly easy for us.  By definition each BehaviorEvent is tied to exactly one thread,
063             * so we can leave its implementation completely unsynchronized.  The only place where
064             * we have contention among threads is as completed events are added to the persistence
065             * queue, so we must add some synchronization there.  We enter synchronized blocks
066             * sparingly and leave quickly to avoid creating bottlenecks.
067             * 
068             * Finally, and perhaps of most concern, we have the issue of event lifecycle contracts.
069             * Attempting to start or stop created events multiple times or out of sequence, to stop 
070             * a parent event before all of its children are finished, to start child events before 
071             * their parents, etc, can screw up our internal bookkeeping.  In the worst case this 
072             * can create memory leaks in the form of unfinished behavior events tied to thread 
073             * local storage. At best it means that the performance database contains unreliable information.
074             * 
075             * To guard against this, first the system is designed so that direct API calls from
076             * applications should never be necessary.  Secondly, we do low-cost bookkeeping checks
077             * on entry to lifecycle methods to make sure that everything is being done in the
078             * proper order, both here and in BehaviorEvent.
079             */
080            
081            private static final Log log = LogFactory.getLog(BehaviorTrackingManagerImpl.class);
082    
083            private boolean warned;
084            
085            private String name;
086            private SessionContext sessionContext;
087            private BehaviorEventPersister persister;
088            private String application;
089            private int flushThreshold = 100;
090            private TaskExecutor executor;
091    
092            //tracks the currently executing event on the calling thread
093            private ThreadLocal<BehaviorEvent> event = new ThreadLocal<BehaviorEvent>();
094    
095            //accumulates completed root-level events waiting to be persisted
096            private LinkedList<BehaviorEvent> writeBuffer = new LinkedList<BehaviorEvent>();
097            private Object bufferSync = new Object();
098            //number of total completed events since the last flush job was queued.
099            private volatile int pendingFlush = 0;
100            //whether a flush has been requested since the last flush was run.
101            private volatile boolean flushRequested = false;
102            
103            //whether logging has been temporarily suspended.
104            private volatile boolean suspended = false;
105            
106            //task executor job to flush events to the database.
107            private Runnable flushJob = new Runnable() {
108                    public void run() {
109                            flush();
110                    }
111            };
112            
113            public BehaviorTrackingManagerImpl() {
114            }
115            
116            public void setBeanName(String name) {
117                    this.name = name;
118            }
119    
120            public String getBeanName() {
121                    return name;
122            }
123            
124            public BehaviorEvent createEvent(String type, String name) {
125                    //TODO: stack depth limits.  hand out a singleton dummy event if there are already
126                    //too many pending events waiting for closure.
127                    return new BehaviorEvent(event.get(), type, name, application, 
128                                                                     sessionContext.getContextUserId(), 
129                                                                     sessionContext.getContextSessionId());
130            }
131    
132            public void start(BehaviorEvent evt) {
133                    //check bookkeeping to prevent a rogue application from screwing up our internal state.
134                    if (evt.getParent() != event.get())
135                            throw new IllegalStateException("Attempted to start an event that is not a child of the pending event");
136    
137                    evt.start();
138                    //push event on stack.
139                    event.set(evt);
140            }
141            
142            public void stop(BehaviorEvent evt) {
143                    //check bookkeeping to prevent a rogue application from screwing up our internal state.
144                    BehaviorEvent current = event.get();
145                    if (evt != current)
146                            throw new IllegalStateException("Attempted to stop an event that is not the current event on this thread: got " + evt + " but expected " + current);
147    
148                    try {
149                            evt.stop();
150                    } finally {
151                            //pop the event stack
152                            event.set(evt.getParent());
153                    }
154    
155                    //if logging has been suspended, we just discard the finished event.
156                    if (!suspended) {
157                            //put event on the write queue and check if the flush
158                            //threshold has been crossed.
159                            synchronized (bufferSync) {
160                                    ++pendingFlush;
161                                    writeBuffer.add(evt);
162                            }
163                            flushIfNeeded();
164                    }
165            }
166    
167            @ManagedAttribute(description="Returns true if event logging has been temporarily disabled with the suspend() operation.")
168            public boolean isSuspended() {
169                    return suspended;
170            }
171            
172            @ManagedOperation(description="Temporarily suspend logging of behavior events.")
173            public String suspend() {
174                    suspended = true;
175                    return "Event logging temporarily suspended.  Use resume() to resume logging.";
176            }
177            
178            @ManagedOperation(description="Resume logging of behavior events after a previous call to suspend().")
179            public String resume() {
180                    suspended = false;
181                    return "Event logging resumed.";
182            }
183    
184            /**
185             * Flush any completed events to the event persister.  This operation can be called
186             * manually via JMX, or can be called on a fixed interval via the Quartz Scheduler.
187             * This operation results in the logging of a "flush" event to the database.
188             * 
189             * @return the number of events persisted
190             */
191            @ManagedOperation(description="Immediately flush all completed events to the behavior tracking database.  Returns the number of events written to the database (not counting the flush event that is also logged)")
192            public int flush() {
193                    
194                    LinkedList<BehaviorEvent> oldList = null;
195                    //rotate the buffer.
196                    synchronized(bufferSync) {
197                            oldList = writeBuffer;
198                            pendingFlush -= oldList.size();
199                            writeBuffer = new LinkedList<BehaviorEvent>();
200                            flushRequested = false;
201                    }
202                    
203                    //prevent no-ops from spewing a bunch of noise into the logs.
204                    if (oldList.isEmpty())
205                            return 0;
206    
207                    //we log flush events, so that we can correlate flush events to system
208                    //resource spikes, and also see evidence of behavior tracking
209                    //churn in the database if tuning parameters aren't set correctly.
210                    
211                    //we don't call our own start/stop/createEvents methods, because that could
212                    //recursively lead to another flush() or other nasty problems if the flush 
213                    //threshold is set too small
214                    BehaviorEvent flushEvent = new FlushEvent(event.get());
215                    if (!warned && !flushEvent.isRoot()) {
216                            warned = true;
217                            log.warn("Flush is being called from inside an application thread!  It is strongly advised the flush only be called from a dedicated, reduced-priority thread pool (are you using a SyncTaskExecutor in your spring configuration?).");
218                    }
219                    EventDataElement data = flushEvent.addData();
220                    flushEvent.start();
221    
222                    int count = -1;
223                    event.set(flushEvent);
224                    try {
225                            
226                            count = persister.persist(oldList);
227                            if (log.isDebugEnabled()) log.debug("Flushed " + count + " events with " + pendingFlush + " remaining");
228                            
229                            return count;
230                            
231                    } finally {
232                            //restore stack state
233                            event.set(flushEvent.getParent());
234                            
235                            data.add("count", count);
236                            flushEvent.stop();
237                            
238                            //persist the flush event immediately.
239                            LinkedList<BehaviorEvent> temp = new LinkedList<BehaviorEvent>();
240                            temp.add(flushEvent);
241                            persister.persist(temp);
242                    }
243            }
244            
245            private void flushIfNeeded() {
246                    boolean requestFlush = false;
247                    synchronized (bufferSync) {
248                            //avoid queueing up duplicate requests by checking the 'flushRequested' flag.
249                            if (flushRequested)
250                                    return;
251                            if (!writeBuffer.isEmpty() && pendingFlush >= flushThreshold) {
252                                    requestFlush = flushRequested = true;
253                                    if (log.isDebugEnabled()) 
254                                            log.debug("requesting autoflush with " + pendingFlush + " events awaiting save");
255                            }
256                    }
257                    if (requestFlush)
258                            executor.execute(flushJob);
259            }
260    
261            @ManagedAttribute(description="The application name for events published by this manager")
262            public String getApplication() {
263                    return application;
264            }
265            
266            @ManagedAttribute(description="The number of completed events not yet flushed")
267            public int getEventsPendingFlush() {
268                    return pendingFlush;
269            }
270            
271            /**
272             * Set the name of the application in which this manager operates, for
273             * logging purposes.  This will be the value of {@link BehaviorEvent#getApplication()}
274             * for all events created by this manager.
275             */
276            @Required
277            public void setApplication(String application) {
278                    this.application = application;
279            }
280    
281            /**
282             * Set a session context for the application, used to determine the
283             * current user and session ID for a calling thread.
284             */
285            public void setSessionContext(SessionContext sessionContext) {
286                    this.sessionContext = sessionContext;
287            }
288    
289            public SessionContext getSessionContext() {
290                    return sessionContext;
291            }
292    
293            /**
294             * Provide a persister for saving finished events to the behavior tracking database.
295             * @param persister
296             */
297            @Required
298            public void setPersister(BehaviorEventPersister persister) {
299                    this.persister = persister;
300            }
301    
302            public BehaviorEventPersister getPersister() {
303                    return persister;
304            }
305    
306            /**
307             * Provide a task executor on which persistence operations will be performed.
308             */
309            @Required
310            public void setExecutor(TaskExecutor executor) {
311                    this.executor = executor;
312            }
313    
314            public TaskExecutor getExecutor() {
315                    return executor;
316            }
317            
318            /**
319             * Specify the maximum number of completed events to queue in memory before
320             * forcing a flush to the persister.  Default is 100 if unspecified.
321             * 
322             * Note that this value is treated as advice and not strictly obeyed.
323             * For example, additional events may accumulate during the time it takes to
324             * rotate the event buffer after the flush threshold is first observed crossed.
325             * 
326             * In other words, persister implementations must not assume that the flush
327             * threshold is a hard upper limit on the batch size of persistence operations.
328             */
329            public void setFlushThreshold(int flushThreshold) {
330                    this.flushThreshold = flushThreshold;
331            }
332            
333            public void afterPropertiesSet() throws Exception {
334                    if (sessionContext == null) {
335                            log.info("No sessionContext specified, using default implementation " + SpringSessionContext.class.getName());
336                            sessionContext = new SpringSessionContext();
337                    }
338            }
339    
340            protected class FlushEvent extends BehaviorEvent {
341    
342                    private static final long serialVersionUID = 3182195013219330932L;
343    
344                    protected FlushEvent(BehaviorEvent parent) {
345                            super(parent, "behavior-tracking", "flush", application, sessionContext.getContextUserId(), sessionContext.getContextSessionId());
346                    }
347                    
348            }
349    
350    }