-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrecord_prediction_data.py
More file actions
160 lines (131 loc) · 6.34 KB
/
record_prediction_data.py
File metadata and controls
160 lines (131 loc) · 6.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# -*- coding: utf-8 -*-
"""
Created on Thu Jul 16 10:32:08 2015
@author: ennever
Ping nextbus API every 30s for a particular MBTA bus stop and direction
"""
import requests, time, os
import xml.etree.ElementTree as ET
import MySQLdb as mdb
import sys
from requests.exceptions import ConnectionError
import pandas as pd
class nextbus_query:
def __init__(self, route = '1', stopID = '0074', agency = 'mbta', table = 'Data_Table_1', db = 'nextbus_1_0074'):
self.route = route #named route
self.stopID = stopID #stop ID number from nextbus, default is Albany Street Towards Dudley Station
self.agency = agency #nextbus agency
self.table = table #SQL table to write too
self.db = db #SQL database
self.username = 'ennever'
self.password = 'ennever123'
self.url = 'http://webservices.nextbus.com/service/publicXMLFeed' #nextbus API url
self.payload = {'command':'predictions', 'a': self.agency, 'r':self.route,
'stopId':self.stopID} #combination of parameters to send to API
def create_table(self, table = None, curtype = None, dropif = False):
if table == None:
table = self.table
con = self.connect_db()
with con:
cur = con.cursor(mdb.cursors.DictCursor)
if dropif:
cur.execute("DROP TABLE IF EXISTS " + table)
cur.execute("CREATE TABLE " + table + "(Id INT PRIMARY KEY AUTO_INCREMENT, \
Stop_ID INT, Vehicle INT, Query_Time BIGINT, Predicted_Time BIGINT, \
Query_Day CHAR(10))")
self.table = table
def connect_db(self):
return mdb.connect('localhost', self.username, self.password, self.db)
def query_nb_api(self):
try:
r = requests.get(self.url, params=self.payload)
except ConnectionError as e:
print e
r = 'No response'
return 0
else:
try:
root = ET.fromstring(r.text)
except ET.ParseError as pe:
print pe
return 0
self.predictions = root.findall('./predictions/direction/prediction')
if len(self.predictions) == 0:
return 0
else:
return 1
def record_query(self, debug = False, logfile = ''):
if logfile != '': #write to file rather than to mysql database
with open(logfile, 'a') as f:
column_headers = ' (Stop_ID, Vehicle, Query_Time, Predicted_Time, Query_Day) '
cs = ', '
for prediction in self.predictions:
predictiontime = prediction.get('epochTime')
seconds = prediction.get('seconds')
vehicle = prediction.get('vehicle')
querytime = str(int(predictiontime) - int(seconds)*1000)
day = time.strftime('%w', time.localtime(1e-3*int(querytime)))
values = "VALUES(" + str(self.stopID) + cs + str(vehicle) + cs + \
str(querytime) + cs + str(predictiontime) + cs + str(day) + ')'
f.write("INSERT INTO " + self.table + column_headers + values)
f.write('\n')
if debug:
print values
else:
con = self.connect_db()
with con:
cur = con.cursor(mdb.cursors.DictCursor)
cs = ', '
column_headers = ' (Stop_ID, Vehicle, Query_Time, Predicted_Time, Query_Day) '
for prediction in self.predictions:
predictiontime = prediction.get('epochTime')
seconds = prediction.get('seconds')
vehicle = prediction.get('vehicle')
querytime = str(int(predictiontime) - int(seconds)*1000)
day = time.strftime('%w', time.localtime(1e-3*int(querytime)))
values = "VALUES(" + str(self.stopID) + cs + str(vehicle) + cs + \
str(querytime) + cs + str(predictiontime) + cs + str(day) + ')'
if debug:
print values
cur.execute("INSERT INTO " + self.table + column_headers + values)
def read_queries(self) :
con = self.connect_db()
rows = pd.read_sql("SELECT * FROM " + \
self.table + " ORDER BY Vehicle;", con=con)
#with con:
# cur = con.cursor(mdb.cursors.DictCursor)
# cur.execute("SELECT Vehicle, Query_Time, Predicted_Time FROM " + \
# self.table + " ORDER BY Vehicle")
# rows = cur.fetchall()
return rows
def read_query_logfile(self, filename):
with open(filename, 'r') as f:
con = self.connect_db()
with con:
column_headers = ' (Stop_ID, Vehicle, Query_Time, Predicted_Time, Query_Day) '
cur = con.cursor(mdb.cursors.DictCursor)
for line in f:
values = line.rstrip('\n')
cur.execute("INSERT INTO " + self.table + column_headers + values)
def start_queries(self, interval = 30, duration = 18, logfile = ''): #do a query every "interval" seconds and record it in table
nqueries = (duration * 3600) / interval
print 'Querying Nextbus Agency: ' + self.agency + ', Route: ' + self.route + ', Stop: ' + self.stopID
print 'Recording to MySQL DB: ' + self.db + ', Table: ' + self.table
print 'Recording for ' + str(duration) + ' hours'
print 'Began at: ' + time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())
for i in range(nqueries):
message = 'Current itteration: ' + str(i) +' of ' + str(nqueries)
sys.stdout.write('\r' + message)
print '',
if i != 0:
time.sleep(interval)
if self.query_nb_api() == 1:
self.record_query(logfile = logfile)
#anbq = nextbus_query()
#nbq.create_table(dropif = True)
#nbq.do_query()
#nbq.record_query(debug = True)
#nbq.start_queries(duration = 12)
#rows = nbq.read_queries()
#for row in rows[-20:]:
# print row['Vehicle'], time.strftime('%H:%M:%S', time.localtime(1e-3*row['Predicted_Time'])),