fetch data bug fix for both index and DVA/DCA calculation

This commit is contained in:
2026-02-02 06:48:49 +08:00
parent d33b521b22
commit 6506932042
15 changed files with 35346 additions and 16918 deletions
+118 -105
View File
@@ -3,7 +3,8 @@ import requests
import os
import csv
import shutil
from datetime import datetime, time
import time
from datetime import datetime
import yfinance as yf
from ta.trend import EMAIndicator
from ta.momentum import StochasticOscillator
@@ -11,73 +12,64 @@ import math
class DataEngine:
def __init__(self, symbol=None, url=None, provider=None, data_dir='data_cache'):
# 1. Clean the incoming symbol
# 1. Clean and set the symbol
self.symbol = symbol.strip().upper() if symbol else None
self.file_path = f"data_cache/{self.symbol}.csv"
# 2. Setup centralized paths
base_path = os.path.dirname(os.path.abspath(__file__))
self.cache_dir = os.path.join(base_path, data_dir)
os.makedirs(self.cache_dir, exist_ok=True)
# 3. Load the master instrument list to find URLs/Providers
# This ensures the engine knows where to go for special tickers
# 3. Load master instrument list
self.master_instruments = self.load_instruments_from_csv('instruments.csv')
# 4. Find config from master list or use passed-in arguments
instrument_config = next((i for i in self.master_instruments if i['symbol'] == self.symbol), None)
# 4. Resolve Config (Priority: CSV > Arguments > Yahoo Fallback)
config = next((i for i in self.master_instruments if i['symbol'].upper() == self.symbol), None)
if instrument_config:
self.url = instrument_config['url']
self.provider = instrument_config['provider']
else:
# Fallback to arguments if ticker isn't in the CSV list
if config:
self.url = config['url']
self.provider = config['provider']
elif url:
self.url = url
self.provider = provider or 'yahoo'
# 5. Define final file path for centralized storage
if self.symbol:
self.file_path = os.path.join(self.cache_dir, f"{self.symbol}.csv")
elif self.symbol:
# Automatic Fallback for missing tickers
self.url = f"https://query1.finance.yahoo.com/v8/finance/chart/{self.symbol}?interval=1d&range=2y"
self.provider = 'yahoo'
else:
self.file_path = None
self.url = None
self.provider = None
# 5. Define file path and auto-sync
self.file_path = os.path.join(self.cache_dir, f"{self.symbol}.csv") if self.symbol else None
self.ensure_data()
# This now handles the "24-hour check" automatically
if self.symbol:
self.ensure_data()
def ensure_data(self):
"""Checks if file exists; if not, downloads it."""
"""Checks if file exists and is fresh (less than 24h old)."""
CACHE_EXPIRY = 24 * 3600 # 24 hours
if os.path.exists(self.file_path):
return True # Data is already there
print(f"DEBUG: {self.symbol} not found in cache. Attempting download...")
try:
# For a generic ticker like SPY, we use yfinance
import yfinance as yf
df = yf.download(self.symbol, period="max")
if df.empty:
print(f"ERROR: No data found for {self.symbol}")
return False
# Clean and save
# 1. If columns are MultiIndex (tuples), take just the first level (the price name)
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
# 2. Reset index to turn 'Date' into a column
df.reset_index(inplace=True)
# 3. Now it is safe to lowercase the column names
df.columns = [str(c).lower() for c in df.columns]
# NEW: Check how old the file is
file_age = time.time() - os.path.getmtime(self.file_path)
if file_age < CACHE_EXPIRY:
return True # Data is actually fresh
else:
print(f"DEBUG: {self.symbol} cache is stale ({round(file_age/3600)}h old). Refreshing...")
else:
print(f"DEBUG: {self.symbol} not found in cache. Attempting download...")
# If we reached here, it means we either have NO file or a STALE file
# Instead of just yfinance, call your specialized fetch_data()
# which uses the URLs from your TEMPLATES
return self.fetch_data()
df.to_csv(self.file_path, index=False)
print(f"DEBUG: Successfully cached {self.symbol}")
return True
except Exception as e:
print(f"ERROR: Download failed for {self.symbol}: {e}")
return False
def load_instruments_from_csv(self, file_path):
instruments = []
# Dynamic templates based on your preference
TEMPLATES = {
'jpm': "https://am.jpmorgan.com/FundsMarketingHandler/historicalData?cusip={cusip}&country=hk&role=per",
'yahoo': "https://query1.finance.yahoo.com/v8/finance/chart/{cusip}?period1=0&period2=9999999999&interval=1d&events=history",
@@ -85,26 +77,22 @@ class DataEngine:
}
try:
# Get absolute path relative to this script
abs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_path)
if not os.path.exists(abs_path):
print(f"Error: Master list {file_path} not found at {abs_path}")
return []
with open(abs_path, mode='r', encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)
# Clean header names (lowercase + remove whitespace)
reader.fieldnames = [name.strip().lower() for name in reader.fieldnames]
for row in reader:
# Use .get() with fallback to avoid KeyErrors
symbol = (row.get('symbol') or '').strip().upper()
cusip = (row.get('cusip') or '').strip()
provider = (row.get('provider') or 'jpm').strip().lower()
provider = (row.get('provider') or 'yahoo').strip().lower()
if symbol and cusip:
template = TEMPLATES.get(provider, TEMPLATES['jpm'])
# Build URL from template
template = TEMPLATES.get(provider, TEMPLATES['yahoo'])
url = template.format(cusip=cusip)
instruments.append({
@@ -113,10 +101,8 @@ class DataEngine:
"provider": provider,
"cusip": cusip
})
except Exception as e:
print(f"CRITICAL: Failed to load instruments.csv: {e}")
return instruments
def _ensure_data_exists(self):
@@ -318,57 +304,67 @@ class DataEngine:
def fetch_data(self):
local_df = pd.DataFrame()
new_df = None
# 1. Load Local Cache & Force Date Type
if os.path.exists(self.file_path):
CACHE_EXPIRY = 24 * 3600
file_exists = os.path.exists(self.file_path)
# 1. Load Local Cache & Check Age
needs_refresh = True
if file_exists:
try:
local_df = pd.read_csv(self.file_path)
local_df = local_df.loc[:, ~local_df.columns.duplicated()].copy()
local_df.columns = [c.lower().strip() for c in local_df.columns]
local_df = local_df.rename(columns={'price': 'close', 'nav': 'close'})
# FORCE CONVERSION: This fixes the '<' error
# errors='coerce' turns bad text into NaT (Not a Time), which we then drop
local_df['date'] = pd.to_datetime(local_df['date'], errors='coerce')
local_df = local_df.dropna(subset=['date']).reset_index(drop=True)
file_age = time.time() - os.path.getmtime(self.file_path)
if file_age < CACHE_EXPIRY:
needs_refresh = False
print(f"🚀 Using Cache: {self.symbol} ({round(file_age/3600, 1)}h old).")
except Exception as e:
print(f"Local Load Error: {e}")
print(f"⚠️ Cache read error for {self.symbol}: {e}")
# 2. Network Fetch
try:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
response = requests.get(self.url, headers=headers, timeout=15)
response.raise_for_status()
if needs_refresh:
try:
if not self.url or str(self.url).lower() == 'none':
print(f"❌ No URL found for {self.symbol}.")
return local_df
if self.provider == 'agi':
new_df = self._parse_ft_html(response.text)
elif self.provider == 'jpm':
new_df = self._parse_jpm(response.json())
elif self.provider == 'yahoo':
new_df = self._parse_yahoo(response.json())
print(f"📡 Syncing {self.symbol} from {self.provider}...")
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
response = requests.get(self.url, headers=headers, timeout=15)
if response.status_code == 200:
new_df = None
# --- PROVIDER ROUTING ---
# ROUTING TO PARSERS
if self.provider == 'yahoo':
new_df = self._parse_yahoo(response.json())
elif self.provider == 'jpm':
new_df = self._parse_jpm(response.json())
elif self.provider in ['agi', 'ft']:
new_df = self._parse_ft_html(response.text)
# 3. Safe Merge & Sort
if new_df is not None and not new_df.empty:
# Force new_df dates to match local_df format
new_df['date'] = pd.to_datetime(new_df['date'], errors='coerce')
combined_df = pd.concat([local_df, new_df], ignore_index=True)
combined_df = combined_df.drop_duplicates(subset=['date'], keep='last')
# SORTING: Now safe because all types are Timestamps
combined_df = combined_df.sort_values('date').reset_index(drop=True)
if 'close' in combined_df.columns:
final_df = combined_df[['date', 'close']].dropna()
final_df.to_csv(self.file_path, index=False)
return final_df
return local_df
# --- MERGE & SAVE ---
if new_df is not None and not new_df.empty:
new_df['date'] = pd.to_datetime(new_df['date'], errors='coerce')
combined_df = pd.concat([local_df, new_df], ignore_index=True)
combined_df = combined_df.sort_values('date').drop_duplicates(subset=['date'], keep='last')
final_df = combined_df.dropna(subset=['date', 'close'])
final_df[['date', 'close']].to_csv(self.file_path, index=False)
print(f"{self.symbol} updated to {final_df['date'].max().date()}")
return final_df
else:
print(f"⚠️ Could not parse data for {self.symbol} via {self.provider}")
else:
print(f"⚠️ {self.provider} returned status {response.status_code}")
except Exception as e:
print(f"Network error for {self.symbol}: {e}")
return local_df
except Exception as e:
print(f"❌ Sync failed for {self.symbol}: {e}")
return local_df
def get_local_metrics(self):
"""Reads ONLY from local CSV and returns metrics immediately."""
@@ -393,26 +389,43 @@ class DataEngine:
if df is None or df.empty or len(df) < 2:
return None
last_close = float(df.iloc[-1]['close'])
# Get the last row for price and date
last_row = df.iloc[-1]
last_close = float(last_row['close'])
# 1. Extract and format the date
# Handles both datetime objects and string dates
last_date = last_row['date']
if hasattr(last_date, 'strftime'):
formatted_date = last_date.strftime('%Y-%m-%d')
else:
formatted_date = str(last_date).split(' ')[0] # Fallback for strings
prev_close = float(df.iloc[-2]['close'])
change_pct = ((last_close - prev_close) / prev_close) * 100
count = len(df)
def get_ema_offset(window):
if count >= window:
from ta.trend import EMAIndicator
ema = EMAIndicator(close=df['close'], window=window).ema_indicator().iloc[-1]
return round(((last_close / ema) * 100) - 100, 1)
return "N/A"
k_val = d_val = "N/A"
if count >= 14:
high_14 = df['close'].rolling(window=14).max()
low_14 = df['close'].rolling(window=14).min()
stoch = StochasticOscillator(high=high_14, low=low_14, close=df['close'], window=14)
from ta.momentum import StochasticOscillator
# Use high/low columns if they exist, otherwise fallback to close for Stoch
high_src = df['high'] if 'high' in df.columns else df['close']
low_src = df['low'] if 'low' in df.columns else df['close']
stoch = StochasticOscillator(high=high_src, low=low_src, close=df['close'], window=14)
k_val = round(stoch.stoch().iloc[-1], 0)
d_val = round(stoch.stoch_signal().iloc[-1], 0)
return {
"symbol": self.symbol,
"last_date": formatted_date, # <--- New field added
"last_close": round(last_close, 2),
"change_pct": round(change_pct, 2),
"low_52": round(float(df.tail(252)['close'].min()), 2),
@@ -421,7 +434,7 @@ class DataEngine:
"last_ema50": get_ema_offset(50),
"last_ema100": get_ema_offset(100),
"last_ema200": get_ema_offset(200),
"kd_values": f"{k_val}/{d_val}" if k_val != "N/A" else "N/A"
"kd_values": f"{int(k_val)}/{int(d_val)}" if k_val != "N/A" else "N/A"
}
class StrategyEngine: