feat: move daemon to docker with daily schedule and deduplication
All checks were successful
Deployment / deploy-docker (push) Successful in 17s

This commit is contained in:
Melchior Reimers
2026-01-23 17:24:05 +01:00
parent 7d9651e04c
commit 746c7167b0
5 changed files with 100 additions and 37 deletions

View File

@@ -1,5 +1,8 @@
import time
import logging
import datetime
import os
import requests
from src.exchanges.eix import EIXExchange
from src.exchanges.ls import LSExchange
from src.database.questdb_client import DatabaseClient
@@ -10,30 +13,74 @@ logging.basicConfig(
)
logger = logging.getLogger("TradingDaemon")
def main():
logger.info("Starting Trading Data Fetcher")
# Initialize components
DB_USER = os.getenv("DB_USER", "admin")
DB_PASSWORD = os.getenv("DB_PASSWORD", "quest")
DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None
def get_last_trade_timestamp(db_url, exchange_name):
# QuestDB query: get the latest timestamp for a specific exchange
query = f"trades where exchange = '{exchange_name}' latest by timestamp"
try:
# Using the /exec endpoint to get data
response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH)
if response.status_code == 200:
data = response.json()
if data['dataset']:
# QuestDB returns timestamp in micros since epoch by default in some views, or ISO
# Let's assume the timestamp is in the dataset
# ILP timestamps are stored as designated timestamps.
ts_value = data['dataset'][0][0] # Adjust index based on column order
if isinstance(ts_value, str):
return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
else:
return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc)
except Exception as e:
logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}")
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
def run_task():
logger.info("Starting Trading Data Fetcher task...")
exchanges = [
EIXExchange(),
LSExchange()
]
db = DatabaseClient()
db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD)
# Process each exchange
for exchange in exchanges:
try:
logger.info(f"Fetching data from {exchange.name}...")
trades = exchange.fetch_latest_trades()
logger.info(f"Fetched {len(trades)} trades from {exchange.name}.")
db_url = "http://questdb:9000"
last_ts = get_last_trade_timestamp(db_url, exchange.name)
if trades:
db.save_trades(trades)
logger.info(f"Stored {len(trades)} trades in database.")
logger.info(f"Fetching data from {exchange.name} (Filtering trades older than {last_ts})...")
trades = exchange.fetch_latest_trades()
# Deduplizierung: Nur Trades nehmen, die neuer sind als der letzte in der DB
new_trades = [
t for t in trades
if t.timestamp.replace(tzinfo=datetime.timezone.utc) > last_ts.replace(tzinfo=datetime.timezone.utc)
]
logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.")
if new_trades:
db.save_trades(new_trades)
logger.info(f"Stored {len(new_trades)} new trades in QuestDB.")
except Exception as e:
logger.error(f"Error processing exchange {exchange.name}: {e}")
logger.info("Fetching cycle complete.")
def main():
logger.info("Trading Daemon started. Waiting for 23:00 to run task.")
while True:
now = datetime.datetime.now()
# Täglich um 23:00 Uhr
if now.hour == 23 and now.minute == 0:
run_task()
# Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern
time.sleep(61)
# Check alle 30 Sekunden
time.sleep(30)
if __name__ == "__main__":
main()