import time import logging import datetime import os import requests from src.exchanges.eix import EIXExchange from src.exchanges.ls import LSExchange from src.database.questdb_client import DatabaseClient logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("TradingDaemon") DB_USER = os.getenv("DB_USER", "admin") DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None def get_last_trade_timestamp(db_url, exchange_name): # QuestDB query: get the latest timestamp for a specific exchange query = f"trades where exchange = '{exchange_name}' latest by timestamp" try: # Using the /exec endpoint to get data response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH) if response.status_code == 200: data = response.json() if data['dataset']: # QuestDB returns timestamp in micros since epoch by default in some views, or ISO # Let's assume the timestamp is in the dataset # ILP timestamps are stored as designated timestamps. ts_value = data['dataset'][0][0] # Adjust index based on column order if isinstance(ts_value, str): return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00')) else: return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc) except Exception as e: logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}") return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) def run_task(historical=False): logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...") # Initialize exchanges eix = EIXExchange() ls = LSExchange() exchanges_to_process = [ (eix, {'limit': None if historical else 1}), (ls, {'include_yesterday': historical}) ] db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD) for exchange in exchanges: try: db_url = "http://questdb:9000" last_ts = get_last_trade_timestamp(db_url, exchange.name) logger.info(f"Fetching data from {exchange.name} (Filtering trades older than {last_ts})...") trades = exchange.fetch_latest_trades(**args) # Deduplizierung: Nur Trades nehmen, die neuer sind als der letzte in der DB new_trades = [ t for t in trades if t.timestamp.replace(tzinfo=datetime.timezone.utc) > last_ts.replace(tzinfo=datetime.timezone.utc) ] logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.") if new_trades: # Sort trades by timestamp before saving (QuestDB likes this) new_trades.sort(key=lambda x: x.timestamp) db.save_trades(new_trades) logger.info(f"Stored {len(new_trades)} new trades in QuestDB.") except Exception as e: logger.error(f"Error processing exchange {exchange.name}: {e}") def main(): logger.info("Trading Daemon started.") # 1. Startup Check: Ist die DB leer? db_url = "http://questdb:9000" is_empty = True try: # Prüfe ob bereits Trades in der Tabelle sind response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH) if response.status_code == 200: data = response.json() if data['dataset'] and data['dataset'][0][0] > 0: is_empty = False except Exception: # Falls Tabelle noch nicht existiert oder DB nicht erreichbar ist is_empty = True if is_empty: logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...") run_task(historical=True) else: logger.info("Found existing data in database. Waiting for scheduled run at 23:00.") while True: now = datetime.datetime.now() # Täglich um 23:00 Uhr if now.hour == 23 and now.minute == 0: run_task(historical=False) # Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern time.sleep(61) # Check alle 30 Sekunden time.sleep(30) if __name__ == "__main__": main()