From 47a8ceab327e97efbc967f1f9fddfe9105ff62ca Mon Sep 17 00:00:00 2001 From: Melchior Reimers Date: Fri, 23 Jan 2026 17:44:06 +0100 Subject: [PATCH] feat: add automatic historical fetch on startup if DB is empty --- daemon.py | 46 +++++++++++++++++++++++++++++++++++--------- src/exchanges/eix.py | 11 +++++------ src/exchanges/ls.py | 25 ++++++++++++------------ 3 files changed, 55 insertions(+), 27 deletions(-) diff --git a/daemon.py b/daemon.py index 391e90f..6426d27 100644 --- a/daemon.py +++ b/daemon.py @@ -38,13 +38,19 @@ def get_last_trade_timestamp(db_url, exchange_name): logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}") return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) -def run_task(): - logger.info("Starting Trading Data Fetcher task...") - exchanges = [ - EIXExchange(), - LSExchange() +def run_task(historical=False): + logger.info(f"Starting Trading Data Fetcher task (Historical: {historical})...") + + # Initialize exchanges + eix = EIXExchange() + ls = LSExchange() + + exchanges_to_process = [ + (eix, {'limit': 100 if historical else 1}), + (ls, {'include_yesterday': historical}) ] - db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD) + + db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD) for exchange in exchanges: try: @@ -52,7 +58,7 @@ def run_task(): last_ts = get_last_trade_timestamp(db_url, exchange.name) logger.info(f"Fetching data from {exchange.name} (Filtering trades older than {last_ts})...") - trades = exchange.fetch_latest_trades() + trades = exchange.fetch_latest_trades(**args) # Deduplizierung: Nur Trades nehmen, die neuer sind als der letzte in der DB new_trades = [ @@ -63,19 +69,41 @@ def run_task(): logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.") if new_trades: + # Sort trades by timestamp before saving (QuestDB likes this) + new_trades.sort(key=lambda x: x.timestamp) db.save_trades(new_trades) logger.info(f"Stored {len(new_trades)} new trades in QuestDB.") except Exception as e: logger.error(f"Error processing exchange {exchange.name}: {e}") def main(): - logger.info("Trading Daemon started. Waiting for 23:00 to run task.") + logger.info("Trading Daemon started.") + + # 1. Startup Check: Ist die DB leer? + db_url = "http://questdb:9000" + is_empty = True + try: + # Prüfe ob bereits Trades in der Tabelle sind + response = requests.get(f"{db_url}/exec", params={'query': 'select count(*) from trades'}, auth=DB_AUTH) + if response.status_code == 200: + data = response.json() + if data['dataset'] and data['dataset'][0][0] > 0: + is_empty = False + except Exception: + # Falls Tabelle noch nicht existiert oder DB nicht erreichbar ist + is_empty = True + + if is_empty: + logger.info("Database is empty or table doesn't exist. Triggering initial historical fetch...") + run_task(historical=True) + else: + logger.info("Found existing data in database. Waiting for scheduled run at 23:00.") while True: now = datetime.datetime.now() # Täglich um 23:00 Uhr if now.hour == 23 and now.minute == 0: - run_task() + run_task(historical=False) # Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern time.sleep(61) diff --git a/src/exchanges/eix.py b/src/exchanges/eix.py index 047fe73..9c7d1df 100644 --- a/src/exchanges/eix.py +++ b/src/exchanges/eix.py @@ -12,7 +12,7 @@ class EIXExchange(BaseExchange): def name(self) -> str: return "EIX" - def fetch_latest_trades(self) -> List[Trade]: + def fetch_latest_trades(self, limit: int = 1) -> List[Trade]: url = "https://european-investor-exchange.com/en/trade-list" response = requests.get(url) response.raise_for_status() @@ -23,13 +23,11 @@ class EIXExchange(BaseExchange): return [] data = json.loads(next_data_script.string) - # The structure according to subagent: data['props']['pageProps']['rowsData'] rows_data = data.get('props', {}).get('pageProps', {}).get('rowsData', []) trades = [] + count = 0 for row in rows_data: - # We only want the most recent ones. For simplicity, let's pick the first one which is likely the latest. - # In a real daemon, we might want to track which ones we already processed. file_key = row.get('key') if not file_key: continue @@ -39,8 +37,9 @@ class EIXExchange(BaseExchange): csv_response = requests.get(csv_url) if csv_response.status_code == 200: trades.extend(self._parse_csv(csv_response.text)) - # Break after one file for demonstration or handle multiple - break + count += 1 + if limit and count >= limit: + break return trades diff --git a/src/exchanges/ls.py b/src/exchanges/ls.py index bdb4ee3..16ddd41 100644 --- a/src/exchanges/ls.py +++ b/src/exchanges/ls.py @@ -8,20 +8,22 @@ class LSExchange(BaseExchange): def name(self) -> str: return "LS" - def fetch_latest_trades(self) -> List[Trade]: - # Today's trades endpoint - url = "https://www.ls-x.de/_rpc/json/.lstc/instrument/list/lstctradestoday" + def fetch_latest_trades(self, include_yesterday: bool = False) -> List[Trade]: + endpoints = ["https://www.ls-x.de/_rpc/json/.lstc/instrument/list/lstctradestoday"] + if include_yesterday: + endpoints.append("https://www.ls-x.de/_rpc/json/.lstc/instrument/list/lstctradesyesterday") - # We might need headers to mimic a browser or handle disclaimer headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'application/json', 'Referer': 'https://www.ls-tc.de/' } - try: - response = requests.get(url, headers=headers) - response.raise_for_status() + all_trades = [] + for url in endpoints: + try: + response = requests.get(url, headers=headers) + response.raise_for_status() import csv import io @@ -42,7 +44,7 @@ class LSExchange(BaseExchange): ts_str = time_str.replace('Z', '+00:00') timestamp = datetime.fromisoformat(ts_str) - trades.append(Trade( + all_trades.append(Trade( exchange=self.name, symbol=symbol, isin=isin, @@ -52,7 +54,6 @@ class LSExchange(BaseExchange): )) except Exception: continue - return trades - except Exception as e: - print(f"Error fetching LS data: {e}") - return [] + except Exception as e: + print(f"Error fetching LS data from {url}: {e}") + return all_trades