001 /* 002 * Copyright 2008-2009 the original author or authors. 003 * The contents of this file are subject to the Mozilla Public License 004 * Version 1.1 (the "License"); you may not use this file except in 005 * compliance with the License. You may obtain a copy of the License at 006 * http://www.mozilla.org/MPL/ 007 * 008 * Software distributed under the License is distributed on an "AS IS" 009 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the 010 * License for the specific language governing rights and limitations 011 * under the License. 012 */ 013 014 package com.mtgi.analytics; 015 016 import static org.springframework.jdbc.support.JdbcUtils.closeResultSet; 017 import static org.springframework.jdbc.support.JdbcUtils.closeStatement; 018 import static org.springframework.jdbc.support.JdbcUtils.supportsBatchUpdates; 019 020 import java.sql.Connection; 021 import java.sql.PreparedStatement; 022 import java.sql.ResultSet; 023 import java.sql.SQLException; 024 import java.sql.Types; 025 import java.util.Queue; 026 027 import javax.xml.stream.XMLOutputFactory; 028 029 import org.springframework.dao.DataAccessException; 030 import org.springframework.jdbc.core.ConnectionCallback; 031 import org.springframework.jdbc.core.support.JdbcDaoSupport; 032 import org.springframework.jdbc.datasource.ConnectionProxy; 033 034 import com.mtgi.analytics.sql.BehaviorTrackingConnectionProxy; 035 036 /** 037 * Basic implementation of {@link BehaviorEventPersister}, which uses JDBC 038 * batching to persist instances of {@link BehaviorEvent} using configurable 039 * insert SQL. An instance of {@link EventDataElementSerializer} is used 040 * to convert event data to XML documents for insertion. 041 */ 042 public class JdbcBehaviorEventPersisterImpl extends JdbcDaoSupport 043 implements BehaviorEventPersister { 044 045 private static final String DEFAULT_ID_SQL = "select SEQ_BEHAVIOR_TRACKING_EVENT.nextval from dual"; 046 private static final String DEFAULT_INSERT_SQL = 047 "insert into BEHAVIOR_TRACKING_EVENT " + 048 "(EVENT_ID, PARENT_EVENT_ID, APPLICATION, EVENT_TYPE, EVENT_NAME, EVENT_START, DURATION_MS, USER_ID, SESSION_ID, ERROR, EVENT_DATA) values " + 049 "( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; 050 051 private int batchSize = 25; 052 private String insertSql = DEFAULT_INSERT_SQL; 053 private String idSql = DEFAULT_ID_SQL; 054 055 private XMLOutputFactory xmlFactory; 056 057 /** 058 * Set the JDBC batch size for executing inserts. Only has effect if the JDBC driver 059 * supports statement batching. Defaults to 25 if unspecified. 060 */ 061 public void setBatchSize(int batchSize) { 062 this.batchSize = batchSize; 063 } 064 065 /** 066 * Set the SQL select statement used to retrieve a new primary key value for an event 067 * prior to insert. Default is 068 * <pre>select S_BEHAVIOR_TRACKING_EVENT.nextval from dual</pre> 069 * if unspecified. 070 */ 071 public void setIdSql(String idSql) { 072 this.idSql = idSql; 073 } 074 075 public String getIdSql() { 076 return idSql; 077 } 078 079 /** 080 * Set the SQL statement used to insert a new behavior event record into the database. 081 * The SQL statement must take exactly 11 parameters, which must accept the following 082 * event values in order: 083 * <ol> 084 * <li>EVENT ID</li> 085 * <li>PARENT EVENT ID</li> 086 * <li>APPLICATION</li> 087 * <li>TYPE</li> 088 * <li>NAME</li> 089 * <li>START</li> 090 * <li>DURATION</li> 091 * <li>USER ID</li> 092 * <li>SESSION ID</li> 093 * <li>ERROR</li> 094 * <li>DATA</li> 095 * </ol> 096 * 097 * Default is 098 * <pre>insert into BEHAVIOR_TRACKING_EVENT 099 * (EVENT_ID, PARENT_EVENT_ID, APPLICATION, TYPE, NAME, START, DURATION_MS, USER_ID, SESSION_ID, ERROR, DATA) 100 * values 101 * ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 102 * </pre> 103 */ 104 public void setInsertSql(String insertSql) { 105 this.insertSql = insertSql; 106 } 107 108 @Override 109 protected void initDao() throws Exception { 110 super.initDao(); 111 this.xmlFactory = XMLOutputFactory.newInstance(); 112 } 113 114 public int persist(final Queue<BehaviorEvent> events) { 115 116 int count = 0; 117 if (!events.isEmpty()) { 118 119 count = (Integer)getJdbcTemplate().execute(new ConnectionCallback() { 120 121 public Object doInConnection(Connection con) throws SQLException, DataAccessException { 122 123 //if this connection is behavior tracking, suspend tracking. 124 //we don't generate more events while persisting. 125 BehaviorTrackingConnectionProxy bt = null; 126 for (Connection c = con; 127 bt == null && c instanceof ConnectionProxy; 128 c = ((ConnectionProxy)c).getTargetConnection()) 129 { 130 if (c instanceof BehaviorTrackingConnectionProxy) { 131 bt = (BehaviorTrackingConnectionProxy)c; 132 bt.suspendTracking(); 133 } 134 } 135 136 try { 137 int tally = 0; //return total events persisted. 138 139 boolean doBatch = supportsBatchUpdates(con); 140 EventDataElementSerializer dataSerializer = new EventDataElementSerializer(xmlFactory); 141 142 PreparedStatement insert = con.prepareStatement(insertSql); 143 try { 144 145 PreparedStatement idQuery = con.prepareStatement(idSql); 146 try { 147 148 //keep track of statements added to the batch so that we can time our 149 //flushes. 150 int batchCount = 0; 151 152 //go until the queue is drained. 153 while (!events.isEmpty()) { 154 155 //pop the next event off of the queue. event may already have an ID assigned if any 156 //of its child events has been persisted. 157 BehaviorEvent next = events.remove(); 158 assignIds(next, idQuery); 159 160 //populate identifying information for the event into the insert statement. 161 insert.setLong(1, (Long)next.getId()); 162 163 BehaviorEvent parent = next.getParent(); 164 nullSafeSet(insert, 2, parent == null ? null : parent.getId(), Types.BIGINT); 165 166 insert.setString(3, next.getApplication()); 167 insert.setString(4, next.getType()); 168 insert.setString(5, next.getName()); 169 insert.setTimestamp(6, new java.sql.Timestamp(next.getStart().getTime())); 170 insert.setLong(7, next.getDuration()); 171 172 //set optional context information on the event. 173 nullSafeSet(insert, 8, next.getUserId(), Types.VARCHAR); 174 nullSafeSet(insert, 9, next.getSessionId(), Types.VARCHAR); 175 nullSafeSet(insert, 10, next.getError(), Types.VARCHAR); 176 177 //convert event data to XML 178 String data = dataSerializer.serialize(next.getData(), true); 179 nullSafeSet(insert, 11, data, Types.VARCHAR); 180 181 if (doBatch) { 182 insert.addBatch(); 183 if (++batchCount >= batchSize) { 184 insert.executeBatch(); 185 batchCount = 0; 186 } 187 } else { 188 insert.executeUpdate(); 189 } 190 191 ++tally; 192 } 193 194 //flush any lingering batch inserts through to the server. 195 if (batchCount > 0) 196 insert.executeBatch(); 197 198 } finally { 199 closeStatement(idQuery); 200 } 201 202 } finally { 203 closeStatement(insert); 204 } 205 206 return tally; 207 208 } finally { 209 if (bt != null) 210 bt.resumeTracking(); 211 } 212 } 213 214 }); 215 } 216 217 return count; 218 } 219 220 private void assignIds(BehaviorEvent event, PreparedStatement idQuery) throws SQLException { 221 if (event.getId() != null) 222 return; 223 224 BehaviorEvent parent = event.getParent(); 225 if (parent != null && parent.getId() == null) 226 assignIds(parent, idQuery); 227 228 if (event.getId() == null) 229 event.setId(nextId(idQuery)); 230 } 231 232 protected Long nextId(PreparedStatement idQuery) throws SQLException { 233 ResultSet rs = idQuery.executeQuery(); 234 try { 235 rs.next(); 236 return rs.getLong(1); 237 } finally { 238 closeResultSet(rs); 239 } 240 } 241 242 protected void nullSafeSet(PreparedStatement stmt, int index, Object value, int sqlType) 243 throws SQLException 244 { 245 if (value == null) 246 stmt.setNull(index, sqlType); 247 else 248 stmt.setObject(index, value, sqlType); 249 } 250 251 }