001 /* 002 * Copyright 2008-2009 the original author or authors. 003 * The contents of this file are subject to the Mozilla Public License 004 * Version 1.1 (the "License"); you may not use this file except in 005 * compliance with the License. You may obtain a copy of the License at 006 * http://www.mozilla.org/MPL/ 007 * 008 * Software distributed under the License is distributed on an "AS IS" 009 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the 010 * License for the specific language governing rights and limitations 011 * under the License. 012 */ 013 014 package com.mtgi.analytics; 015 016 import java.util.LinkedList; 017 018 import org.apache.commons.logging.Log; 019 import org.apache.commons.logging.LogFactory; 020 import org.springframework.beans.factory.BeanNameAware; 021 import org.springframework.beans.factory.InitializingBean; 022 import org.springframework.beans.factory.annotation.Required; 023 import org.springframework.core.task.TaskExecutor; 024 import org.springframework.jmx.export.annotation.ManagedAttribute; 025 import org.springframework.jmx.export.annotation.ManagedOperation; 026 import org.springframework.jmx.export.annotation.ManagedResource; 027 028 import com.mtgi.analytics.servlet.SpringSessionContext; 029 030 /** 031 * <p>Standard implementation of {@link BehaviorTrackingManager}. BehaviorEvent 032 * instances are asynchronously committed to a BehaviorEventPersister when they are 033 * complete; user and session IDs for the events are provided by an implementation 034 * of {@link SessionContext}.</p> 035 * 036 * <p>Event persistence occurs when {@link #flush()} is called (either via JMX control 037 * or by the Quartz scheduler), or when the queue of uncommitted events exceeds 038 * the configured threshold value. The flush threshold can be configured with 039 * {@link #setFlushThreshold(int)}.</p> 040 */ 041 @ManagedResource(objectName="com.mtgi:group=analytics,name=BehaviorTracking", 042 description="Monitor and control user behavior tracking") 043 public class BehaviorTrackingManagerImpl implements BehaviorTrackingManager, InitializingBean, BeanNameAware { 044 045 /* 046 * Implementation is complicated by many issues, the first of which is performance. 047 * Interface method calls must return as quickly as possible to prevent measurement 048 * from interfering with application responsiveness. Our only really expensive 049 * operation is event persistence flush, so we do this asynchronously on a TaskExecutor. 050 * We also log an event for each flush, so that if behavior tracking does start 051 * to consume significant resources, we can find evidence of that in the performance 052 * database and tune accordingly. 053 * 054 * Another issue is memory. We anticipate periodic flushing of completed events, 055 * but if activity temporarily exceeds anticipated levels, we want to avoid an excessive 056 * backlog of completed events waiting in memory. So, we add a configurable flush 057 * threshold. When more than the threshold number of events are finished, we automatically 058 * flush. We also allow child events to be persisted before their parents complete, to avoid 059 * accumulating too many events in memory for long-running batch processes. 060 * 061 * We also must worry about thread-safety. Fortunately this one is 062 * fairly easy for us. By definition each BehaviorEvent is tied to exactly one thread, 063 * so we can leave its implementation completely unsynchronized. The only place where 064 * we have contention among threads is as completed events are added to the persistence 065 * queue, so we must add some synchronization there. We enter synchronized blocks 066 * sparingly and leave quickly to avoid creating bottlenecks. 067 * 068 * Finally, and perhaps of most concern, we have the issue of event lifecycle contracts. 069 * Attempting to start or stop created events multiple times or out of sequence, to stop 070 * a parent event before all of its children are finished, to start child events before 071 * their parents, etc, can screw up our internal bookkeeping. In the worst case this 072 * can create memory leaks in the form of unfinished behavior events tied to thread 073 * local storage. At best it means that the performance database contains unreliable information. 074 * 075 * To guard against this, first the system is designed so that direct API calls from 076 * applications should never be necessary. Secondly, we do low-cost bookkeeping checks 077 * on entry to lifecycle methods to make sure that everything is being done in the 078 * proper order, both here and in BehaviorEvent. 079 */ 080 081 private static final Log log = LogFactory.getLog(BehaviorTrackingManagerImpl.class); 082 083 private boolean warned; 084 085 private String name; 086 private SessionContext sessionContext; 087 private BehaviorEventPersister persister; 088 private String application; 089 private int flushThreshold = 100; 090 private TaskExecutor executor; 091 092 //tracks the currently executing event on the calling thread 093 private ThreadLocal<BehaviorEvent> event = new ThreadLocal<BehaviorEvent>(); 094 095 //accumulates completed root-level events waiting to be persisted 096 private LinkedList<BehaviorEvent> writeBuffer = new LinkedList<BehaviorEvent>(); 097 private Object bufferSync = new Object(); 098 //number of total completed events since the last flush job was queued. 099 private volatile int pendingFlush = 0; 100 //whether a flush has been requested since the last flush was run. 101 private volatile boolean flushRequested = false; 102 103 //whether logging has been temporarily suspended. 104 private volatile boolean suspended = false; 105 106 //task executor job to flush events to the database. 107 private Runnable flushJob = new Runnable() { 108 public void run() { 109 flush(); 110 } 111 }; 112 113 public BehaviorTrackingManagerImpl() { 114 } 115 116 public void setBeanName(String name) { 117 this.name = name; 118 } 119 120 public String getBeanName() { 121 return name; 122 } 123 124 public BehaviorEvent createEvent(String type, String name) { 125 //TODO: stack depth limits. hand out a singleton dummy event if there are already 126 //too many pending events waiting for closure. 127 return new BehaviorEvent(event.get(), type, name, application, 128 sessionContext.getContextUserId(), 129 sessionContext.getContextSessionId()); 130 } 131 132 public void start(BehaviorEvent evt) { 133 //check bookkeeping to prevent a rogue application from screwing up our internal state. 134 if (evt.getParent() != event.get()) 135 throw new IllegalStateException("Attempted to start an event that is not a child of the pending event"); 136 137 evt.start(); 138 //push event on stack. 139 event.set(evt); 140 } 141 142 public void stop(BehaviorEvent evt) { 143 //check bookkeeping to prevent a rogue application from screwing up our internal state. 144 BehaviorEvent current = event.get(); 145 if (evt != current) 146 throw new IllegalStateException("Attempted to stop an event that is not the current event on this thread: got " + evt + " but expected " + current); 147 148 try { 149 evt.stop(); 150 } finally { 151 //pop the event stack 152 event.set(evt.getParent()); 153 } 154 155 //if logging has been suspended, we just discard the finished event. 156 if (!suspended) { 157 //put event on the write queue and check if the flush 158 //threshold has been crossed. 159 synchronized (bufferSync) { 160 ++pendingFlush; 161 writeBuffer.add(evt); 162 } 163 flushIfNeeded(); 164 } 165 } 166 167 @ManagedAttribute(description="Returns true if event logging has been temporarily disabled with the suspend() operation.") 168 public boolean isSuspended() { 169 return suspended; 170 } 171 172 @ManagedOperation(description="Temporarily suspend logging of behavior events.") 173 public String suspend() { 174 suspended = true; 175 return "Event logging temporarily suspended. Use resume() to resume logging."; 176 } 177 178 @ManagedOperation(description="Resume logging of behavior events after a previous call to suspend().") 179 public String resume() { 180 suspended = false; 181 return "Event logging resumed."; 182 } 183 184 /** 185 * Flush any completed events to the event persister. This operation can be called 186 * manually via JMX, or can be called on a fixed interval via the Quartz Scheduler. 187 * This operation results in the logging of a "flush" event to the database. 188 * 189 * @return the number of events persisted 190 */ 191 @ManagedOperation(description="Immediately flush all completed events to the behavior tracking database. Returns the number of events written to the database (not counting the flush event that is also logged)") 192 public int flush() { 193 194 LinkedList<BehaviorEvent> oldList = null; 195 //rotate the buffer. 196 synchronized(bufferSync) { 197 oldList = writeBuffer; 198 pendingFlush -= oldList.size(); 199 writeBuffer = new LinkedList<BehaviorEvent>(); 200 flushRequested = false; 201 } 202 203 //prevent no-ops from spewing a bunch of noise into the logs. 204 if (oldList.isEmpty()) 205 return 0; 206 207 //we log flush events, so that we can correlate flush events to system 208 //resource spikes, and also see evidence of behavior tracking 209 //churn in the database if tuning parameters aren't set correctly. 210 211 //we don't call our own start/stop/createEvents methods, because that could 212 //recursively lead to another flush() or other nasty problems if the flush 213 //threshold is set too small 214 BehaviorEvent flushEvent = new FlushEvent(event.get()); 215 if (!warned && !flushEvent.isRoot()) { 216 warned = true; 217 log.warn("Flush is being called from inside an application thread! It is strongly advised the flush only be called from a dedicated, reduced-priority thread pool (are you using a SyncTaskExecutor in your spring configuration?)."); 218 } 219 EventDataElement data = flushEvent.addData(); 220 flushEvent.start(); 221 222 int count = -1; 223 event.set(flushEvent); 224 try { 225 226 count = persister.persist(oldList); 227 if (log.isDebugEnabled()) log.debug("Flushed " + count + " events with " + pendingFlush + " remaining"); 228 229 return count; 230 231 } finally { 232 //restore stack state 233 event.set(flushEvent.getParent()); 234 235 data.add("count", count); 236 flushEvent.stop(); 237 238 //persist the flush event immediately. 239 LinkedList<BehaviorEvent> temp = new LinkedList<BehaviorEvent>(); 240 temp.add(flushEvent); 241 persister.persist(temp); 242 } 243 } 244 245 private void flushIfNeeded() { 246 boolean requestFlush = false; 247 synchronized (bufferSync) { 248 //avoid queueing up duplicate requests by checking the 'flushRequested' flag. 249 if (flushRequested) 250 return; 251 if (!writeBuffer.isEmpty() && pendingFlush >= flushThreshold) { 252 requestFlush = flushRequested = true; 253 if (log.isDebugEnabled()) 254 log.debug("requesting autoflush with " + pendingFlush + " events awaiting save"); 255 } 256 } 257 if (requestFlush) 258 executor.execute(flushJob); 259 } 260 261 @ManagedAttribute(description="The application name for events published by this manager") 262 public String getApplication() { 263 return application; 264 } 265 266 @ManagedAttribute(description="The number of completed events not yet flushed") 267 public int getEventsPendingFlush() { 268 return pendingFlush; 269 } 270 271 /** 272 * Set the name of the application in which this manager operates, for 273 * logging purposes. This will be the value of {@link BehaviorEvent#getApplication()} 274 * for all events created by this manager. 275 */ 276 @Required 277 public void setApplication(String application) { 278 this.application = application; 279 } 280 281 /** 282 * Set a session context for the application, used to determine the 283 * current user and session ID for a calling thread. 284 */ 285 public void setSessionContext(SessionContext sessionContext) { 286 this.sessionContext = sessionContext; 287 } 288 289 public SessionContext getSessionContext() { 290 return sessionContext; 291 } 292 293 /** 294 * Provide a persister for saving finished events to the behavior tracking database. 295 * @param persister 296 */ 297 @Required 298 public void setPersister(BehaviorEventPersister persister) { 299 this.persister = persister; 300 } 301 302 public BehaviorEventPersister getPersister() { 303 return persister; 304 } 305 306 /** 307 * Provide a task executor on which persistence operations will be performed. 308 */ 309 @Required 310 public void setExecutor(TaskExecutor executor) { 311 this.executor = executor; 312 } 313 314 public TaskExecutor getExecutor() { 315 return executor; 316 } 317 318 /** 319 * Specify the maximum number of completed events to queue in memory before 320 * forcing a flush to the persister. Default is 100 if unspecified. 321 * 322 * Note that this value is treated as advice and not strictly obeyed. 323 * For example, additional events may accumulate during the time it takes to 324 * rotate the event buffer after the flush threshold is first observed crossed. 325 * 326 * In other words, persister implementations must not assume that the flush 327 * threshold is a hard upper limit on the batch size of persistence operations. 328 */ 329 public void setFlushThreshold(int flushThreshold) { 330 this.flushThreshold = flushThreshold; 331 } 332 333 public void afterPropertiesSet() throws Exception { 334 if (sessionContext == null) { 335 log.info("No sessionContext specified, using default implementation " + SpringSessionContext.class.getName()); 336 sessionContext = new SpringSessionContext(); 337 } 338 } 339 340 protected class FlushEvent extends BehaviorEvent { 341 342 private static final long serialVersionUID = 3182195013219330932L; 343 344 protected FlushEvent(BehaviorEvent parent) { 345 super(parent, "behavior-tracking", "flush", application, sessionContext.getContextUserId(), sessionContext.getContextSessionId()); 346 } 347 348 } 349 350 }