diff --git a/Dockerfile.dashboard b/Dockerfile.dashboard new file mode 100644 index 0000000..5b22dd4 --- /dev/null +++ b/Dockerfile.dashboard @@ -0,0 +1,10 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +CMD ["python", "dashboard/server.py"] diff --git a/Dockerfile.metadata b/Dockerfile.metadata new file mode 100644 index 0000000..b7035c7 --- /dev/null +++ b/Dockerfile.metadata @@ -0,0 +1,10 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +CMD ["python", "src/metadata/fetcher.py"] diff --git a/dashboard/public/index.html b/dashboard/public/index.html new file mode 100644 index 0000000..d02982d --- /dev/null +++ b/dashboard/public/index.html @@ -0,0 +1,248 @@ + + + + + + Trading Intelligence Dashboard + + + + + + + + + + + +
+
+
+

Market Overview

+

Real-time trading analytics across multiple exchanges.

+
+
+ +
+ Connected +
+
+
+ + +
+
+

Total Trades

+

--

+
+
+

Total Volume

+

--

+
+
+

Tracked ISINs

+

--

+
+
+ + +
+
+

Trade Volume Evolution

+
+
+
+

Distribution by Continent

+
+
+
+ + +
+
+

Company Metadata

+ +
+ + + + + + + + + + + + + +
ISINNameCountryContinentSector
+
+
+ + + + diff --git a/dashboard/server.py b/dashboard/server.py new file mode 100644 index 0000000..96f7658 --- /dev/null +++ b/dashboard/server.py @@ -0,0 +1,78 @@ +from fastapi import FastAPI, HTTPException, Depends +from fastapi.middleware.cors import CORSMiddleware +from fastapi.staticfiles import StaticFiles +from fastapi.responses import FileResponse +import requests +import os +import pandas as pd + +app = FastAPI(title="Trading Dashboard API") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + +# Serve static files +app.mount("/static", StaticFiles(directory="dashboard/public"), name="static") + +@app.get("/") +async def read_index(): + return FileResponse('dashboard/public/index.html') + +DB_USER = os.getenv("DB_USER", "admin") +DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") +DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None +DB_HOST = os.getenv("DB_HOST", "questdb") + +@app.get("/api/trades") +async def get_trades(isin: str = None, days: int = 7): + query = f"select * from trades where timestamp > dateadd('d', -{days}, now())" + if isin: + query += f" and isin = '{isin}'" + query += " order by timestamp asc" + + try: + response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + return response.json() + throw_http_error(response) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/metadata") +async def get_metadata(): + query = "select * from metadata" + try: + response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + return response.json() + throw_http_error(response) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/summary") +async def get_summary(): + # Group by continent/country if metadata exists + query = """ + select m.continent, m.country, count(*) as trade_count, sum(t.price * t.quantity) as total_volume + from trades t + join metadata m on t.isin = m.isin + group by m.continent, m.country + """ + try: + response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + return response.json() + throw_http_error(response) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +def throw_http_error(res): + raise HTTPException(status_code=res.status_code, detail=f"QuestDB error: {res.text}") + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/docker-compose.yml b/docker-compose.yml index 82a65b0..d527f56 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,12 +15,13 @@ services: - QDB_HTTP_AUTH_ENABLED=true - QDB_HTTP_USER=${DB_USER:-admin} - QDB_HTTP_PASSWORD=${DB_PASSWORD:-quest} - # ILP Auth (optional, but good for consistency) - QDB_PG_USER=${DB_USER:-admin} - QDB_PG_PASSWORD=${DB_PASSWORD:-quest} fetcher: - build: . + build: + context: . + dockerfile: Dockerfile container_name: trading_fetcher depends_on: - questdb @@ -30,5 +31,34 @@ services: - DB_USER=${DB_USER:-admin} - DB_PASSWORD=${DB_PASSWORD:-quest} + metadata_fetcher: + build: + context: . + dockerfile: Dockerfile.metadata + container_name: metadata_fetcher + depends_on: + - questdb + restart: always + environment: + - PYTHONUNBUFFERED=1 + - DB_USER=${DB_USER:-admin} + - DB_PASSWORD=${DB_PASSWORD:-quest} + - DB_HOST=questdb + + dashboard: + build: + context: . + dockerfile: Dockerfile.dashboard + container_name: trading_dashboard + ports: + - "8080:8000" + depends_on: + - questdb + restart: always + environment: + - DB_USER=${DB_USER:-admin} + - DB_PASSWORD=${DB_PASSWORD:-quest} + - DB_HOST=questdb + volumes: questdb_data: diff --git a/requirements.txt b/requirements.txt index 1190bd8..6f16e08 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,6 @@ requests beautifulsoup4 +fastapi +uvicorn +pandas +python-multipart diff --git a/src/metadata/fetcher.py b/src/metadata/fetcher.py new file mode 100644 index 0000000..906e3d6 --- /dev/null +++ b/src/metadata/fetcher.py @@ -0,0 +1,115 @@ +import requests +import time +import logging +import os +import datetime + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger("MetadataDaemon") + +DB_USER = os.getenv("DB_USER", "admin") +DB_PASSWORD = os.getenv("DB_PASSWORD", "quest") +DB_AUTH = (DB_USER, DB_PASSWORD) if DB_USER and DB_PASSWORD else None +DB_HOST = os.getenv("DB_HOST", "questdb") + +def get_unique_isins(): + query = "select distinct isin from trades" + try: + response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + data = response.json() + return [row[0] for row in data.get('dataset', []) if row[0]] + except Exception as e: + logger.error(f"Error fetching unique ISINs: {e}") + return [] + +def get_processed_isins(): + query = "select distinct isin from metadata" + try: + response = requests.get(f"http://{DB_HOST}:9000/exec", params={'query': query}, auth=DB_AUTH) + if response.status_code == 200: + data = response.json() + return [row[0] for row in data.get('dataset', []) if row[0]] + except Exception: + # Table might not exist yet + return [] + return [] + +def fetch_metadata(isin): + logger.info(f"Fetching metadata for ISIN: {isin}") + metadata = { + 'isin': isin, + 'name': 'Unknown', + 'country': 'Unknown', + 'continent': 'Unknown', + 'sector': 'Unknown' + } + + # 1. GLEIF API for Name and Country + try: + gleif_url = f"https://api.gleif.org/api/v1/lei-records?filter[isin]={isin}" + res = requests.get(gleif_url, timeout=10) + if res.status_code == 200: + data = res.json().get('data', []) + if data: + attr = data[0].get('attributes', {}) + metadata['name'] = attr.get('entity', {}).get('legalName', {}).get('name', 'Unknown') + metadata['country'] = attr.get('entity', {}).get('legalAddress', {}).get('country', 'Unknown') + except Exception as e: + logger.error(f"GLEIF error for {isin}: {e}") + + # 2. Continent mapping from Country Code + if metadata['country'] != 'Unknown': + try: + country_url = f"https://restcountries.com/v3.1/alpha/{metadata['country']}" + res = requests.get(country_url, timeout=10) + if res.status_code == 200: + data = res.json() + if data and isinstance(data, list): + continents = data[0].get('continents', []) + if continents: + metadata['continent'] = continents[0] + except Exception as e: + logger.error(f"RestCountries error for {metadata['country']}: {e}") + + return metadata + +def save_metadata(metadata): + # QuestDB Influx Line Protocol + # table,tag1=val1 field1="str",field2=num timestamp + name = metadata['name'].replace(' ', '\\ ').replace(',', '\\,') + country = metadata['country'] + continent = metadata['continent'] + sector = metadata['sector'] + isin = metadata['isin'] + + line = f'metadata,isin={isin} name="{name}",country="{country}",continent="{continent}",sector="{sector}"' + + try: + response = requests.post(f"http://{DB_HOST}:9000/write", data=line + "\n", auth=DB_AUTH) + if response.status_code not in [200, 204]: + logger.error(f"Error saving metadata: {response.text}") + except Exception as e: + logger.error(f"Connection error to QuestDB: {e}") + +def main(): + logger.info("Metadata Daemon started.") + while True: + unique_isins = get_unique_isins() + processed_isins = get_processed_isins() + + new_isins = [i for i in unique_isins if i not in processed_isins] + + if new_isins: + logger.info(f"Found {len(new_isins)} new ISINs to process.") + for isin in new_isins: + data = fetch_metadata(isin) + save_metadata(data) + time.sleep(1) # Rate limiting + else: + logger.info("No new ISINs found.") + + time.sleep(3600) # Run once per hour + +if __name__ == "__main__": + main()