first commit
This commit is contained in:
46
src/database/questdb_client.py
Normal file
46
src/database/questdb_client.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import requests
|
||||
import time
|
||||
from typing import List
|
||||
from ..exchanges.base import Trade
|
||||
|
||||
class DatabaseClient:
|
||||
def __init__(self, host: str = "localhost", port: int = 9000):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.url = f"http://{host}:{port}/write"
|
||||
|
||||
def save_trades(self, trades: List[Trade]):
|
||||
if not trades:
|
||||
return
|
||||
|
||||
lines = []
|
||||
for trade in trades:
|
||||
# QuestDB Influx Line Protocol format:
|
||||
# table_name,tag1=val1,tag2=val2 field1=val1,field2=val2 timestamp
|
||||
# We use microseconds for timestamp (nanoseconds is standard for ILP)
|
||||
|
||||
# Clean symbols for ILP
|
||||
symbol = trade.symbol.replace(" ", "\\ ").replace(",", "\\,")
|
||||
exchange = trade.exchange
|
||||
|
||||
line = f"trades,exchange={exchange},symbol={symbol},isin={trade.isin} " \
|
||||
f"price={trade.price},quantity={trade.quantity} " \
|
||||
f"{int(trade.timestamp.timestamp() * 1e9)}"
|
||||
lines.append(line)
|
||||
|
||||
payload = "\n".join(lines) + "\n"
|
||||
|
||||
try:
|
||||
response = requests.post(self.url, data=payload, params={'precision': 'ns'})
|
||||
if response.status_code != 204:
|
||||
print(f"Error saving to QuestDB: {response.text}")
|
||||
except Exception as e:
|
||||
print(f"Could not connect to QuestDB at {self.url}: {e}")
|
||||
# Fallback: print to console or save to file
|
||||
self._fallback_save(trades)
|
||||
|
||||
def _fallback_save(self, trades: List[Trade]):
|
||||
# Just log to a file for now if QuestDB is not available
|
||||
with open("trades_fallback.log", "a") as f:
|
||||
for t in trades:
|
||||
f.write(f"{t.timestamp} | {t.exchange} | {t.symbol} | {t.price} | {t.quantity}\n")
|
||||
Reference in New Issue
Block a user