Files
trading-daemon/src/exchanges/ls.py
Melchior Reimers 47a8ceab32
All checks were successful
Deployment / deploy-docker (push) Successful in 4s
feat: add automatic historical fetch on startup if DB is empty
2026-01-23 17:44:06 +01:00

60 lines
2.2 KiB
Python

import requests
from datetime import datetime
from typing import List
from .base import BaseExchange, Trade
class LSExchange(BaseExchange):
@property
def name(self) -> str:
return "LS"
def fetch_latest_trades(self, include_yesterday: bool = False) -> List[Trade]:
endpoints = ["https://www.ls-x.de/_rpc/json/.lstc/instrument/list/lstctradestoday"]
if include_yesterday:
endpoints.append("https://www.ls-x.de/_rpc/json/.lstc/instrument/list/lstctradesyesterday")
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json',
'Referer': 'https://www.ls-tc.de/'
}
all_trades = []
for url in endpoints:
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
import csv
import io
f = io.StringIO(response.text)
# Header: isin;displayName;tradeTime;price;currency;size;orderId
reader = csv.DictReader(f, delimiter=';')
trades = []
for item in reader:
try:
price = float(item['price'].replace(',', '.'))
quantity = float(item['size'].replace(',', '.'))
isin = item['isin']
symbol = item['displayName']
time_str = item['tradeTime']
# Format: 2026-01-23T07:30:00.992000Z
ts_str = time_str.replace('Z', '+00:00')
timestamp = datetime.fromisoformat(ts_str)
all_trades.append(Trade(
exchange=self.name,
symbol=symbol,
isin=isin,
price=price,
quantity=quantity,
timestamp=timestamp
))
except Exception:
continue
except Exception as e:
print(f"Error fetching LS data from {url}: {e}")
return all_trades