001    /* 
002     * Copyright 2008-2009 the original author or authors.
003     * The contents of this file are subject to the Mozilla Public License
004     * Version 1.1 (the "License"); you may not use this file except in
005     * compliance with the License. You may obtain a copy of the License at
006     * http://www.mozilla.org/MPL/
007     *
008     * Software distributed under the License is distributed on an "AS IS"
009     * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
010     * License for the specific language governing rights and limitations
011     * under the License.
012     */
013     
014    package com.mtgi.analytics;
015    
016    import static org.springframework.jdbc.support.JdbcUtils.closeResultSet;
017    import static org.springframework.jdbc.support.JdbcUtils.closeStatement;
018    import static org.springframework.jdbc.support.JdbcUtils.supportsBatchUpdates;
019    
020    import java.sql.Connection;
021    import java.sql.PreparedStatement;
022    import java.sql.ResultSet;
023    import java.sql.SQLException;
024    import java.sql.Types;
025    import java.util.Queue;
026    
027    import javax.xml.stream.XMLOutputFactory;
028    
029    import org.springframework.dao.DataAccessException;
030    import org.springframework.jdbc.core.ConnectionCallback;
031    import org.springframework.jdbc.core.support.JdbcDaoSupport;
032    import org.springframework.jdbc.datasource.ConnectionProxy;
033    
034    import com.mtgi.analytics.sql.BehaviorTrackingConnectionProxy;
035    
036    /**
037     * Basic implementation of {@link BehaviorEventPersister}, which uses JDBC
038     * batching to persist instances of {@link BehaviorEvent} using configurable
039     * insert SQL.  An instance of {@link EventDataElementSerializer} is used
040     * to convert event data to XML documents for insertion.
041     */
042    public class JdbcBehaviorEventPersisterImpl extends JdbcDaoSupport
043            implements BehaviorEventPersister {
044    
045            private static final String DEFAULT_ID_SQL = "select SEQ_BEHAVIOR_TRACKING_EVENT.nextval from dual";
046            private static final String DEFAULT_INSERT_SQL = 
047                    "insert into BEHAVIOR_TRACKING_EVENT " +
048                            "(EVENT_ID, PARENT_EVENT_ID, APPLICATION, EVENT_TYPE, EVENT_NAME, EVENT_START, DURATION_MS, USER_ID, SESSION_ID, ERROR, EVENT_DATA) values " +
049                            "(       ?,               ?,           ?,          ?,           ?,          ?,           ?,       ?,          ?,     ?,          ?)";
050            
051            private int batchSize = 25;
052            private String insertSql = DEFAULT_INSERT_SQL;
053            private String idSql = DEFAULT_ID_SQL;
054            
055            private XMLOutputFactory xmlFactory;
056    
057            /**
058             * Set the JDBC batch size for executing inserts.  Only has effect if the JDBC driver
059             * supports statement batching. Defaults to 25 if unspecified.
060             */
061            public void setBatchSize(int batchSize) {
062                    this.batchSize = batchSize;
063            }
064    
065            /**
066             * Set the SQL select statement used to retrieve a new primary key value for an event
067             * prior to insert.  Default is
068             * <pre>select S_BEHAVIOR_TRACKING_EVENT.nextval from dual</pre>
069             * if unspecified.
070             */
071            public void setIdSql(String idSql) {
072                    this.idSql = idSql;
073            }
074    
075            public String getIdSql() {
076                    return idSql;
077            }
078            
079            /**
080             * Set the SQL statement used to insert a new behavior event record into the database.
081             * The SQL statement must take exactly 11 parameters, which must accept the following 
082             * event values in order:
083             * <ol>
084             * <li>EVENT ID</li> 
085             * <li>PARENT EVENT ID</li> 
086             * <li>APPLICATION</li> 
087             * <li>TYPE</li> 
088             * <li>NAME</li> 
089             * <li>START</li> 
090             * <li>DURATION</li> 
091             * <li>USER ID</li> 
092             * <li>SESSION ID</li> 
093             * <li>ERROR</li> 
094             * <li>DATA</li>
095             * </ol>
096             * 
097             * Default is 
098             * <pre>insert into BEHAVIOR_TRACKING_EVENT
099             *  (EVENT_ID, PARENT_EVENT_ID, APPLICATION, TYPE, NAME, START, DURATION_MS, USER_ID, SESSION_ID, ERROR, DATA) 
100             *   values 
101             *  (       ?,               ?,           ?,    ?,    ?,     ?,           ?,       ?,          ?,     ?,    ?)
102             * </pre>
103             */
104            public void setInsertSql(String insertSql) {
105                    this.insertSql = insertSql;
106            }
107    
108            @Override
109            protected void initDao() throws Exception {
110                    super.initDao();
111                    this.xmlFactory = XMLOutputFactory.newInstance();
112            }
113    
114            public int persist(final Queue<BehaviorEvent> events) {
115    
116                    int count = 0;
117                    if (!events.isEmpty()) {
118                    
119                            count = (Integer)getJdbcTemplate().execute(new ConnectionCallback() {
120            
121                                    public Object doInConnection(Connection con) throws SQLException, DataAccessException {
122            
123                                            //if this connection is behavior tracking, suspend tracking.
124                                            //we don't generate more events while persisting.
125                                            BehaviorTrackingConnectionProxy bt = null;
126                                            for (Connection c = con; 
127                                                     bt == null && c instanceof ConnectionProxy; 
128                                                     c = ((ConnectionProxy)c).getTargetConnection()) 
129                                            {
130                                                    if (c instanceof BehaviorTrackingConnectionProxy) {
131                                                            bt = (BehaviorTrackingConnectionProxy)c;
132                                                            bt.suspendTracking();
133                                                    }
134                                            }
135                                            
136                                            try {
137                                                    int tally = 0; //return total events persisted.
138                                                    
139                                                    boolean doBatch = supportsBatchUpdates(con);
140                                                    EventDataElementSerializer dataSerializer = new EventDataElementSerializer(xmlFactory);
141            
142                                                    PreparedStatement insert = con.prepareStatement(insertSql);
143                                                    try {
144                                                            
145                                                            PreparedStatement idQuery = con.prepareStatement(idSql);
146                                                            try {
147                                                                    
148                                                                    //keep track of statements added to the batch so that we can time our
149                                                                    //flushes.
150                                                                    int batchCount = 0;
151                                                                    
152                                                                    //go until the queue is drained.
153                                                                    while (!events.isEmpty()) {
154                                                                            
155                                                                            //pop the next event off of the queue.  event may already have an ID assigned if any
156                                                                            //of its child events has been persisted.
157                                                                            BehaviorEvent next = events.remove();
158                                                                            assignIds(next, idQuery);
159    
160                                                                            //populate identifying information for the event into the insert statement.
161                                                                            insert.setLong(1, (Long)next.getId());
162                                                                            
163                                                                            BehaviorEvent parent = next.getParent();
164                                                                            nullSafeSet(insert, 2, parent == null ? null : parent.getId(), Types.BIGINT);
165    
166                                                                            insert.setString(3, next.getApplication());
167                                                                            insert.setString(4, next.getType());
168                                                                            insert.setString(5, next.getName());
169                                                                            insert.setTimestamp(6, new java.sql.Timestamp(next.getStart().getTime()));
170                                                                            insert.setLong(7, next.getDuration());
171            
172                                                                            //set optional context information on the event.
173                                                                            nullSafeSet(insert, 8, next.getUserId(), Types.VARCHAR);
174                                                                            nullSafeSet(insert, 9, next.getSessionId(), Types.VARCHAR);
175                                                                            nullSafeSet(insert, 10, next.getError(), Types.VARCHAR);
176                    
177                                                                            //convert event data to XML
178                                                                            String data = dataSerializer.serialize(next.getData(), true);
179                                                                            nullSafeSet(insert, 11, data, Types.VARCHAR);
180            
181                                                                            if (doBatch) {
182                                                                                    insert.addBatch();
183                                                                                    if (++batchCount >= batchSize) {
184                                                                                            insert.executeBatch();
185                                                                                            batchCount = 0;
186                                                                                    }
187                                                                            } else {
188                                                                                    insert.executeUpdate();
189                                                                            }
190            
191                                                                            ++tally;
192                                                                    }
193            
194                                                                    //flush any lingering batch inserts through to the server.
195                                                                    if (batchCount > 0)
196                                                                            insert.executeBatch();
197                                                                    
198                                                            } finally {
199                                                                    closeStatement(idQuery);
200                                                            }
201                                                            
202                                                    } finally {
203                                                            closeStatement(insert);
204                                                    }
205                                                    
206                                                    return tally;
207                                                    
208                                            } finally {
209                                                    if (bt != null)
210                                                            bt.resumeTracking();
211                                            }
212                                    }
213                                    
214                            });
215                    }
216                    
217                    return count;
218            }
219            
220            private void assignIds(BehaviorEvent event, PreparedStatement idQuery) throws SQLException {
221                    if (event.getId() != null)
222                            return;
223                    
224                    BehaviorEvent parent = event.getParent();
225                    if (parent != null && parent.getId() == null)
226                            assignIds(parent, idQuery);
227                    
228                    if (event.getId() == null)
229                            event.setId(nextId(idQuery));
230            }
231            
232            protected Long nextId(PreparedStatement idQuery) throws SQLException {
233                    ResultSet rs = idQuery.executeQuery();
234                    try {
235                            rs.next();
236                            return rs.getLong(1);
237                    } finally {
238                            closeResultSet(rs);
239                    }
240            }
241            
242            protected void nullSafeSet(PreparedStatement stmt, int index, Object value, int sqlType) 
243                    throws SQLException
244            {
245                    if (value == null)
246                            stmt.setNull(index, sqlType);
247                    else
248                            stmt.setObject(index, value, sqlType);
249            }
250            
251    }