Commit a7a55962 authored by Taddeüs Kroes's avatar Taddeüs Kroes

Simplify poll script (remove priority queue) so that it does not crash anymore

parent 8c460d36
...@@ -6,64 +6,31 @@ import time ...@@ -6,64 +6,31 @@ import time
import grequests import grequests
import json import json
from argparse import ArgumentParser from argparse import ArgumentParser
from Queue import PriorityQueue
# TODO: try round-robin queue instead of priority queue exchanges = [
# TODO: log update times to evaluate queue type ('https://btc-e.com/api/3/ticker/btc_usd',
'btce', ('btc_usd', 'last'), 'USD'),
exchanges = { ('https://api.gdax.com/products/BTC-USD/ticker',
'https://btc-e.com/api/3/ticker/btc_usd': 'coinbase', ('price',), 'USD'),
('btce', ('btc_usd', 'last'), 'USD'), ('https://www.bitstamp.net/api/v2/ticker/btcusd/',
'https://api.gdax.com/products/BTC-USD/ticker': 'bitstamp', ('last',), 'USD'),
('coinbase', ('price',), 'USD'), ('https://api.bitfinex.com/v1/pubticker/btcusd',
'https://www.bitstamp.net/api/v2/ticker/btcusd/': 'bitfinex', ('last_price',), 'USD'),
('bitstamp', ('last',), 'USD'), ('https://data.btcchina.com/data/ticker?market=btccny',
'https://api.bitfinex.com/v1/pubticker/btcusd': 'btcchina', ('ticker', 'last'), 'CNY'),
('bitfinex', ('last_price',), 'USD'), ('http://api.huobi.com/staticmarket/ticker_btc_json.js',
'https://data.btcchina.com/data/ticker?market=btccny': 'huobi', ('ticker', 'last'), 'CNY'),
('btcchina', ('ticker', 'last'), 'CNY'), ('https://www.okcoin.com/api/v1/ticker.do?symbol=btc_usd',
'http://api.huobi.com/staticmarket/ticker_btc_json.js': 'okcoin', ('ticker', 'last'), 'CNY'),
('huobi', ('ticker', 'last'), 'CNY'), ('https://api.kraken.com/0/public/Ticker?pair=XXBTZEUR',
'https://www.okcoin.com/api/v1/ticker.do?symbol=btc_usd': 'kraken', ('result', 'XXBTZEUR', 'c', 0), 'EUR'),
('okcoin', ('ticker', 'last'), 'CNY'), ('https://api.coinmarketcap.com/v1/ticker/bitcoin/',
'https://api.kraken.com/0/public/Ticker?pair=XXBTZEUR': 'coinmarketcap', (0, 'market_cap_usd'), 'USD'),
('kraken', ('result', 'XXBTZEUR', 'c', 0), 'EUR'), ]
}
marketcaps = { def handle_error(req, e):
'https://api.coinmarketcap.com/v1/ticker/bitcoin/':
('coinmarketcap', (0, 'market_cap_usd'), 'USD'),
}
status = {}
queue = PriorityQueue(len(exchanges) + len(marketcaps))
def generate_requests():
assert queue.empty()
for url in exchanges.iterkeys():
queue.put((0, url))
for url in marketcaps.iterkeys():
queue.put((0, url))
while True:
scheduled, url = queue.get()
wait = scheduled - time.time()
if wait > 0:
interval = status[url][1][1]
time.sleep(min(wait, interval))
yield grequests.get(url, timeout=5)
def requeue_after_error(req, e):
print >>sys.stderr, 'Error:', e.message print >>sys.stderr, 'Error:', e.message
status[req.url][1][0] += 0.5
queue.put((status[req.url][1][0], req.url))
if __name__ == '__main__': if __name__ == '__main__':
...@@ -75,21 +42,14 @@ if __name__ == '__main__': ...@@ -75,21 +42,14 @@ if __name__ == '__main__':
'(default dev/shm/tothemoon)') '(default dev/shm/tothemoon)')
parser.add_argument('-i', '--interval', metavar='SECONDS=2', type=float, parser.add_argument('-i', '--interval', metavar='SECONDS=2', type=float,
default=2, help='exchange poll interval') default=2, help='exchange poll interval')
parser.add_argument('-c', '--cap-interval', metavar='SECONDS=60', type=float,
default=60, help='market cap poll interval')
parser.add_argument('-l', '--list', action='store_true', default=False, parser.add_argument('-l', '--list', action='store_true', default=False,
help='list api info') help='list api info')
args = parser.parse_args() args = parser.parse_args()
root = args.dir root = args.dir
for url, ex in exchanges.iteritems():
status[url] = ex, [0, args.interval]
for url, cap in marketcaps.iteritems():
status[url] = cap, [0, args.cap_interval]
if args.list: if args.list:
for url, ((exid, indices, currency), stat) in status.iteritems(): for url, exid, indices, currency in exchanges:
print exid print exid
print ' url: ', url print ' url: ', url
print ' currency:', currency print ' currency:', currency
...@@ -100,7 +60,7 @@ if __name__ == '__main__': ...@@ -100,7 +60,7 @@ if __name__ == '__main__':
# clean up created files on exit # clean up created files on exit
def remove_files(): def remove_files():
for (exid, indices, currency), stat in status.itervalues(): for url, exid, indices, currency in exchanges:
path = root + '/' + exid path = root + '/' + exid
if os.path.exists(path): if os.path.exists(path):
os.remove(path) os.remove(path)
...@@ -122,24 +82,42 @@ if __name__ == '__main__': ...@@ -122,24 +82,42 @@ if __name__ == '__main__':
# keep updating URLs from priority queue, adding them back to the queue # keep updating URLs from priority queue, adding them back to the queue
# scheduled after their current interval # scheduled after their current interval
try: try:
for res in grequests.imap(generate_requests(), last_update = 0
exception_handler=requeue_after_error):
(exid, indices, currency), stat = status[res.request.url] while True:
interval = stat[1] diff = time.time() - last_update
stat[0] = scheduled = time.time() + interval if diff < args.interval:
queue.put((scheduled, res.request.url)) time.sleep(args.interval - diff)
try: requests = (grequests.get(ex[0], timeout=args.interval * 1.5)
last = res.json() for ex in exchanges)
except ValueError as e: responses = grequests.map(requests, exception_handler=handle_error)
print >>sys.stderr, 'invalid response from %s:' % exid, e.message
continue for (url, exid, indices, currency), res in zip(exchanges, responses):
status = {'last': None, 'currency': currency, 'updated': 0}
for i in indices:
last = last[i] if res is None:
last = float(last) print >>sys.stderr, exid, 'unknown error'
elif res.status_code != 200:
with open(root + '/' + exid, 'w') as f: print >>sys.stderr, 'server error %d at %s' % (res.status_code, exid)
json.dump({'last': last, 'currency': currency}, f) else:
try:
last = res.json()
for i in indices:
last = last[i]
last = float(last)
status['updated'] = time.time()
status['last'] = last
except ValueError as e:
print >>sys.stderr, 'invalid response from %s:' % exid, e.message
except KeyError as e:
print >>sys.stderr, 'unexpected response content from %s:' % exid, res.text
with open(root + '/' + exid, 'w') as f:
json.dump(status, f)
last_update = time.time()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment