Commit 4853845d authored by Taddeüs Kroes's avatar Taddeüs Kroes

Add exception handler and clean up code

parent 3ba9bcff
...@@ -8,31 +8,37 @@ import json ...@@ -8,31 +8,37 @@ import json
from argparse import ArgumentParser from argparse import ArgumentParser
from Queue import PriorityQueue from Queue import PriorityQueue
interval = 2 # poll exchange tickers every 2 seconds # TODO: try round-robin queue instead of priority queue
cap_interval = 60 # poll check market cap every minute # TODO: log update times to evaluate queue type
exchanges = { exchanges = {
'https://btc-e.com/api/3/ticker/btc_usd': 'https://btc-e.com/api/3/ticker/btc_usd':
['btce', ('btc_usd', 'last'), 'USD', interval], ('btce', ('btc_usd', 'last'), 'USD'),
'https://api.gdax.com/products/BTC-USD/ticker': 'https://api.gdax.com/products/BTC-USD/ticker':
['coinbase', ('price',), 'USD', interval], ('coinbase', ('price',), 'USD'),
'https://www.bitstamp.net/api/v2/ticker/btcusd/': 'https://www.bitstamp.net/api/v2/ticker/btcusd/':
['bitstamp', ('last',), 'USD', interval], ('bitstamp', ('last',), 'USD'),
'https://api.bitfinex.com/v1/pubticker/btcusd': 'https://api.bitfinex.com/v1/pubticker/btcusd':
['bitfinex', ('last_price',), 'USD', interval], ('bitfinex', ('last_price',), 'USD'),
'https://data.btcchina.com/data/ticker?market=btccny': 'https://data.btcchina.com/data/ticker?market=btccny':
['btcchina', ('ticker', 'last'), 'CNY', interval], ('btcchina', ('ticker', 'last'), 'CNY'),
'http://api.huobi.com/staticmarket/ticker_btc_json.js': 'http://api.huobi.com/staticmarket/ticker_btc_json.js':
['huobi', ('ticker', 'last'), 'CNY', interval], ('huobi', ('ticker', 'last'), 'CNY'),
'https://www.okcoin.com/api/v1/ticker.do?symbol=btc_usd': 'https://www.okcoin.com/api/v1/ticker.do?symbol=btc_usd':
['okcoin', ('ticker', 'last'), 'CNY', interval], ('okcoin', ('ticker', 'last'), 'CNY'),
'https://api.kraken.com/0/public/Ticker?pair=XXBTZEUR': 'https://api.kraken.com/0/public/Ticker?pair=XXBTZEUR':
['kraken', ('result', 'XXBTZEUR', 'c', 0), 'EUR', interval], ('kraken', ('result', 'XXBTZEUR', 'c', 0), 'EUR'),
}
marketcaps = {
'https://api.coinmarketcap.com/v1/ticker/bitcoin/': 'https://api.coinmarketcap.com/v1/ticker/bitcoin/':
['coinmarketcap', (0, 'market_cap_usd'), 'USD', cap_interval], ('coinmarketcap', (0, 'market_cap_usd'), 'USD'),
} }
queue = PriorityQueue(len(exchanges)) status = {}
queue = PriorityQueue(len(exchanges) + len(marketcaps))
def generate_requests(): def generate_requests():
assert queue.empty() assert queue.empty()
...@@ -40,15 +46,26 @@ def generate_requests(): ...@@ -40,15 +46,26 @@ def generate_requests():
for url in exchanges.iterkeys(): for url in exchanges.iterkeys():
queue.put((0, url)) queue.put((0, url))
for url in marketcaps.iterkeys():
queue.put((0, url))
while True: while True:
scheduled, url = queue.get() scheduled, url = queue.get()
wait = scheduled - time.time() wait = scheduled - time.time()
if wait > 0: if wait > 0:
time.sleep(wait) interval = status[url][1][1]
time.sleep(min(wait, interval))
yield grequests.get(url, timeout=5) yield grequests.get(url, timeout=5)
def requeue_after_error(req, e):
print >>sys.stderr, 'Error for %s:' % req.url, e.message
status[req.url][1][0] += 0.5
queue.put((status[req.url][1][0], req.url))
if __name__ == '__main__': if __name__ == '__main__':
# parse command line options # parse command line options
parser = ArgumentParser(description='Poll bitcoin API services.') parser = ArgumentParser(description='Poll bitcoin API services.')
...@@ -56,36 +73,35 @@ if __name__ == '__main__': ...@@ -56,36 +73,35 @@ if __name__ == '__main__':
default='/dev/shm/tothemoon', default='/dev/shm/tothemoon',
help='directory to save poll results in ' help='directory to save poll results in '
'(default dev/shm/tothemoon)') '(default dev/shm/tothemoon)')
parser.add_argument('-i', '--interval', metavar='SECONDS', type=float, parser.add_argument('-i', '--interval', metavar='SECONDS=2', type=float,
default=interval, default=2, help='exchange poll interval')
help='poll interval per exchange in seconds ' parser.add_argument('-c', '--cap-interval', metavar='SECONDS=60', type=float,
'(default %f)' % interval) default=60, help='market cap poll interval')
parser.add_argument('-l', '--list', action='store_true', default=False, parser.add_argument('-l', '--list', action='store_true', default=False,
help='list exchange info') help='list api info')
args = parser.parse_args() args = parser.parse_args()
root = args.dir root = args.dir
for ex in exchanges.itervalues(): for url, ex in exchanges.iteritems():
if ex[0] != 'coinmarketcap': status[url] = ex, [0, args.interval]
ex[3] = args.interval for url, cap in marketcaps.iteritems():
status[url] = cap, [0, args.cap_interval]
if args.list: if args.list:
for url, (exid, indices, currency, interval) in exchanges.iteritems(): for url, ((exid, indices, currency), stat) in status.iteritems():
print exid print exid
print ' url: ', url print ' url: ', url
print ' currency:', currency print ' currency:', currency
print ' lookup: response[%s]' % ']['.join(repr(i) for i in indices) print ' lookup: response[%s]' % ']['.join(repr(i) for i in indices)
print ' interval:', interval, 'seconds'
print ' path: %s/%s' % (root, exid)
sys.exit(0) sys.exit(0)
create_root = not os.path.isdir(root) create_root = not os.path.isdir(root)
# clean up created files on exit # clean up created files on exit
def remove_files(): def remove_files():
for ex in exchanges.itervalues(): for (exid, indices, currency), stat in status.itervalues():
path = root + '/' + ex[0] path = root + '/' + exid
if os.path.exists(path): if os.path.exists(path):
os.remove(path) os.remove(path)
...@@ -106,9 +122,12 @@ if __name__ == '__main__': ...@@ -106,9 +122,12 @@ if __name__ == '__main__':
# keep updating URLs from priority queue, adding them back to the queue # keep updating URLs from priority queue, adding them back to the queue
# scheduled after their current interval # scheduled after their current interval
try: try:
for res in grequests.imap(generate_requests()): for res in grequests.imap(generate_requests(),
exid, indices, currency, interval = exchanges[res.request.url] exception_handler=requeue_after_error):
queue.put((time.time() + interval, res.request.url)) (exid, indices, currency), stat = status[res.request.url]
interval = stat[1]
stat[0] = scheduled = time.time() + interval
queue.put((scheduled, res.request.url))
last = res.json() last = res.json()
for i in indices: for i in indices:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment