diff --git a/.gitea/workflows/deploy.yaml b/.gitea/workflows/deploy.yaml index ae9865c..38789d4 100644 --- a/.gitea/workflows/deploy.yaml +++ b/.gitea/workflows/deploy.yaml @@ -6,37 +6,27 @@ on: - main jobs: - deploy-to-host: + deploy-docker: runs-on: ubuntu-latest steps: - name: Deploy via SSH uses: appleboy/ssh-action@v0.1.10 with: - host: 172.17.0.1 # Standard-IP des Docker-Hosts aus Sicht des Containers - username: root # Dein Deployment-Nutzer + host: 172.17.0.1 + username: root key: ${{ secrets.SSH_PRIVATE_KEY }} script: | mkdir -p /root/docker-files/trading-daemon cd /root/docker-files/trading-daemon - # Repository klonen oder pullen + # Repository aktualisieren if [ ! -d ".git" ]; then git clone https://git.bana.space/krumbelfix/trading-daemon.git . else git pull origin main fi - # Virtual Environment aktualisieren - python3 -m venv venv - ./venv/bin/pip install -r requirements.txt + # Docker Container neu bauen und starten + docker-compose up -d --build - # Systemd Update - cp systemd/trading-daemon.service /etc/systemd/system/ - cp systemd/trading-daemon.timer /etc/systemd/system/ - systemctl daemon-reload - - # Neustart - systemctl restart trading-daemon.timer - systemctl start trading-daemon.service - - echo "Deployment auf dem Host-System erfolgreich." + echo "Deployment abgeschlossen. Container laufen." diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..a5bab4f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +# Wir nutzen einfach ein kurzes Script oder lassen den Daemon in einem Loop laufen. +# Da der User "täglich um 23h" möchte, bauen wir die Zeitsteuerung in den Daemon ein +# ODER wir nutzen einen einfachen Sleep-Loop. + +CMD ["python", "daemon.py"] diff --git a/daemon.py b/daemon.py index 210ad59..391e90f 100644 --- a/daemon.py +++ b/daemon.py @@ -1,5 +1,8 @@ import time import logging +import datetime +import os +import requests from src.exchanges.eix import EIXExchange from src.exchanges.ls import LSExchange from src.database.questdb_client import DatabaseClient @@ -10,30 +13,74 @@ logging.basicConfig( ) logger = logging.getLogger("TradingDaemon") -def main(): - logger.info("Starting Trading Data Fetcher") - - # Initialize components +DB_USER = os.getenv("DB_USER", "admin") +DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") +DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None + +def get_last_trade_timestamp(db_url, exchange_name): + # QuestDB query: get the latest timestamp for a specific exchange + query = f"trades where exchange = '{exchange_name}' latest by timestamp" + try: + # Using the /exec endpoint to get data + response = requests.get(f"{db_url}/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + data = response.json() + if data['dataset']: + # QuestDB returns timestamp in micros since epoch by default in some views, or ISO + # Let's assume the timestamp is in the dataset + # ILP timestamps are stored as designated timestamps. + ts_value = data['dataset'][0][0] # Adjust index based on column order + if isinstance(ts_value, str): + return datetime.datetime.fromisoformat(ts_value.replace('Z', '+00:00')) + else: + return datetime.datetime.fromtimestamp(ts_value / 1000000, tz=datetime.timezone.utc) + except Exception as e: + logger.debug(f"No existing data for {exchange_name} or DB unreachable: {e}") + return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) + +def run_task(): + logger.info("Starting Trading Data Fetcher task...") exchanges = [ EIXExchange(), LSExchange() ] - db = DatabaseClient() + db = DatabaseClient(host="questdb", user=DB_USER, password=DB_PASSWORD) - # Process each exchange for exchange in exchanges: try: - logger.info(f"Fetching data from {exchange.name}...") - trades = exchange.fetch_latest_trades() - logger.info(f"Fetched {len(trades)} trades from {exchange.name}.") + db_url = "http://questdb:9000" + last_ts = get_last_trade_timestamp(db_url, exchange.name) - if trades: - db.save_trades(trades) - logger.info(f"Stored {len(trades)} trades in database.") + logger.info(f"Fetching data from {exchange.name} (Filtering trades older than {last_ts})...") + trades = exchange.fetch_latest_trades() + + # Deduplizierung: Nur Trades nehmen, die neuer sind als der letzte in der DB + new_trades = [ + t for t in trades + if t.timestamp.replace(tzinfo=datetime.timezone.utc) > last_ts.replace(tzinfo=datetime.timezone.utc) + ] + + logger.info(f"Found {len(trades)} total trades, {len(new_trades)} are new.") + + if new_trades: + db.save_trades(new_trades) + logger.info(f"Stored {len(new_trades)} new trades in QuestDB.") except Exception as e: logger.error(f"Error processing exchange {exchange.name}: {e}") - logger.info("Fetching cycle complete.") +def main(): + logger.info("Trading Daemon started. Waiting for 23:00 to run task.") + + while True: + now = datetime.datetime.now() + # Täglich um 23:00 Uhr + if now.hour == 23 and now.minute == 0: + run_task() + # Warte 61s, um Mehrfachausführung in derselben Minute zu verhindern + time.sleep(61) + + # Check alle 30 Sekunden + time.sleep(30) if __name__ == "__main__": main() diff --git a/docker-compose.yml b/docker-compose.yml index 75f1300..e612ed4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,16 @@ services: - "9009:9009" volumes: - questdb_data:/root/.questdb + restart: always + + fetcher: + build: . + container_name: trading_fetcher + depends_on: + - questdb + restart: always + environment: + - PYTHONUNBUFFERED=1 volumes: questdb_data: diff --git a/src/database/questdb_client.py b/src/database/questdb_client.py index 1532222..2d3495c 100644 --- a/src/database/questdb_client.py +++ b/src/database/questdb_client.py @@ -4,10 +4,11 @@ from typing import List from ..exchanges.base import Trade class DatabaseClient: - def __init__(self, host: str = "localhost", port: int = 9000): + def __init__(self, host: str = "localhost", port: int = 9000, user: str = None, password: str = None): self.host = host self.port = port self.url = f"http://{host}:{port}/write" + self.auth = (user, password) if user and password else None def save_trades(self, trades: List[Trade]): if not trades: @@ -15,10 +16,6 @@ class DatabaseClient: lines = [] for trade in trades: - # QuestDB Influx Line Protocol format: - # table_name,tag1=val1,tag2=val2 field1=val1,field2=val2 timestamp - # We use microseconds for timestamp (nanoseconds is standard for ILP) - # Clean symbols for ILP symbol = trade.symbol.replace(" ", "\\ ").replace(",", "\\,") exchange = trade.exchange @@ -31,8 +28,13 @@ class DatabaseClient: payload = "\n".join(lines) + "\n" try: - response = requests.post(self.url, data=payload, params={'precision': 'ns'}) - if response.status_code != 204: + response = requests.post( + self.url, + data=payload, + params={'precision': 'ns'}, + auth=self.auth + ) + if response.status_code not in [204, 200]: print(f"Error saving to QuestDB: {response.text}") except Exception as e: print(f"Could not connect to QuestDB at {self.url}: {e}")