Files
historical-prices/engine.py
T

718 lines
30 KiB
Python

import pandas as pd
import requests
import os
import csv
import shutil
import time
from datetime import datetime
import yfinance as yf
from ta.trend import EMAIndicator
from ta.momentum import StochasticOscillator
import math
class DataEngine:
def __init__(self, symbol=None, url=None, provider=None, data_dir='data_cache'):
# 1. Clean and set the symbol
self.symbol = symbol.strip().upper() if symbol else None
self.file_path = f"data_cache/{self.symbol}.csv"
# 2. Setup centralized paths
base_path = os.path.dirname(os.path.abspath(__file__))
self.cache_dir = os.path.join(base_path, data_dir)
os.makedirs(self.cache_dir, exist_ok=True)
# 3. Load master instrument list
self.master_instruments = self.load_instruments_from_csv('instruments.csv')
# 4. Resolve Config (Priority: CSV > Arguments > Yahoo Fallback)
config = next((i for i in self.master_instruments if i['symbol'].upper() == self.symbol), None)
if config:
self.url = config['url']
self.provider = config['provider']
elif url:
self.url = url
self.provider = provider or 'yahoo'
elif self.symbol:
# Automatic Fallback for missing tickers
self.url = f"https://query1.finance.yahoo.com/v8/finance/chart/{self.symbol}?interval=1d&range=2y"
self.provider = 'yahoo'
else:
self.url = None
self.provider = None
# 5. Define file path and auto-sync
self.file_path = os.path.join(self.cache_dir, f"{self.symbol}.csv") if self.symbol else None
# This now handles the "24-hour check" automatically
if self.symbol:
self.ensure_data()
def ensure_data(self):
"""Checks if file exists and is fresh (less than 24h old)."""
CACHE_EXPIRY = 24 * 3600 # 24 hours
if os.path.exists(self.file_path):
# NEW: Check how old the file is
file_age = time.time() - os.path.getmtime(self.file_path)
if file_age < CACHE_EXPIRY:
return True # Data is actually fresh
else:
print(f"DEBUG: {self.symbol} cache is stale ({round(file_age/3600)}h old). Refreshing...")
else:
print(f"DEBUG: {self.symbol} not found in cache. Attempting download...")
# If we reached here, it means we either have NO file or a STALE file
# Instead of just yfinance, call your specialized fetch_data()
# which uses the URLs from your TEMPLATES
return self.fetch_data()
def load_instruments_from_csv(self, file_path):
instruments = []
# Dynamic templates based on your preference
TEMPLATES = {
'jpm': "https://am.jpmorgan.com/FundsMarketingHandler/historicalData?cusip={cusip}&country=hk&role=per",
'yahoo': "https://query1.finance.yahoo.com/v8/finance/chart/{cusip}?period1=0&period2=9999999999&interval=1d&events=history",
'agi': "https://markets.ft.com/data/funds/tearsheet/historical?s={cusip}"
}
try:
abs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_path)
if not os.path.exists(abs_path):
return []
with open(abs_path, mode='r', encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)
reader.fieldnames = [name.strip().lower() for name in reader.fieldnames]
for row in reader:
symbol = (row.get('symbol') or '').strip().upper()
cusip = (row.get('cusip') or '').strip()
provider = (row.get('provider') or 'yahoo').strip().lower()
if symbol and cusip:
# Build URL from template
template = TEMPLATES.get(provider, TEMPLATES['yahoo'])
url = template.format(cusip=cusip)
instruments.append({
"symbol": symbol,
"url": url,
"provider": provider,
"cusip": cusip
})
except Exception as e:
print(f"CRITICAL: Failed to load instruments.csv: {e}")
return instruments
def _ensure_data_exists(self):
if not os.path.exists(self.file_path):
# Check if this symbol exists in our master CSV mapping
match = next((i for i in self.instruments if i['symbol'].upper() == self.symbol), None)
if match:
print(f"DEBUG: Found {self.symbol} in master list. Fetching from {match['provider']}...")
self._download_from_provider(match)
else:
print(f"DEBUG: {self.symbol} not in master list. Trying generic Yahoo Finance...")
self._download_generic_yahoo()
def _download_generic_yahoo(self):
"""Standard yfinance fallback"""
try:
df = yf.download(self.symbol, period="max")
if not df.empty:
df.reset_index(inplace=True)
df.columns = [c.lower() for c in df.columns]
df.to_csv(self.file_path, index=False)
except Exception as e:
print(f"Yahoo fallback failed: {e}")
def global_sync(self):
"""Backup, Sync all instruments, and return a summary report."""
# 1. Run Maintenance/Backup
self.run_pre_sync_maintenance()
# FIX 1: Add 'self.' so it calls the method inside this class
instruments = self.load_instruments_from_csv('instruments.csv')
report = {
"total": len(instruments),
"updated": 0,
"failed": 0,
"details": []
}
for item in instruments:
try:
self.symbol = item['symbol']
self.provider = item['provider']
self.url = item['url']
# FIX 2: Use 'self.cache_dir' to match your __init__ logic
self.file_path = os.path.join(self.cache_dir, f"{self.symbol}.csv")
print(f"Updating {self.symbol}...")
# fetch_data now returns the updated DataFrame or None
result_df = self.fetch_data()
time.sleep(1)
if result_df is not None and not result_df.empty:
report["updated"] += 1
last_price = result_df['close'].iloc[-1]
report["details"].append(f"{self.symbol}: Updated (Price: {last_price})")
else:
report["failed"] += 1
report["details"].append(f"{self.symbol}: No new data found")
except Exception as e:
report["failed"] += 1
report["details"].append(f"⚠️ {self.symbol}: Error ({str(e)})")
return report
def run_pre_sync_maintenance(self):
"""Backs up files and reports current data health."""
import os
import shutil
import pandas as pd
from datetime import datetime
# 1. Setup paths correctly
base_dir = os.path.dirname(os.path.abspath(__file__))
backup_dir = os.path.join(base_dir, 'backups')
# 2. Create the timestamped folder path FIRST
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
current_backup_path = os.path.join(backup_dir, f"sync_backup_{timestamp}")
# 3. Create the directories (safety-first)
os.makedirs(current_backup_path, exist_ok=True)
print(f"\n--- Pre-Sync Health Check ({timestamp}) ---")
stats = []
# 4. Check if cache exists to avoid errors
if not os.path.exists(self.cache_dir):
print(f"⚠️ Cache directory not found at {self.cache_dir}")
return pd.DataFrame()
# 5. Backup loop
for filename in os.listdir(self.cache_dir):
if filename.endswith(".csv"):
src = os.path.join(self.cache_dir, filename)
dst = os.path.join(current_backup_path, filename)
try:
# Perform copy
shutil.copy2(src, dst)
# Read data for health check
df = pd.read_csv(src)
# Store stats
stats.append({
"Fund": filename.replace(".csv", ""),
"Rows": len(df),
"Start": df['date'].min() if 'date' in df.columns else "N/A",
"End": df['date'].max() if 'date' in df.columns else "N/A"
})
print(f"📦 Backed up: {filename} ({len(df)} rows)")
except Exception as e:
print(f"⚠️ Could not backup {filename}: {e}")
continue
# 6. Display and return report
if stats:
stats_df = pd.DataFrame(stats)
print("\n" + stats_df.to_string(index=False))
print(f"\n✅ All backups saved to: {current_backup_path}")
return stats_df
else:
print("📭 No CSV files found to backup.")
return pd.DataFrame()
def _parse_jpm(self, json_data):
if isinstance(json_data, dict) and "historicalNAVList" in json_data:
df = pd.DataFrame(json_data["historicalNAVList"])
return df.rename(columns={'navPrice': 'close', 'date': 'date'})
return None
def _parse_ft_html(self, html_text):
try:
# 1. Use BeautifulSoup to handle the nested spans in the Date column
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_text, 'html.parser')
# Find the specific results table
table = soup.find('table', class_='mod-tearsheet-historical-prices__results')
if not table:
print(f"❌ Could not find the results table in the HTML for {self.symbol}")
return None
data = []
rows = table.find('tbody').find_all('tr')
for row in rows:
cols = row.find_all('td')
if len(cols) >= 5:
# The Date cell has two spans. We'll take the first one (Full date).
date_cell = cols[0].find('span', class_='mod-ui-hide-small-below')
date_str = date_cell.get_text(strip=True) if date_cell else cols[0].get_text(strip=True)
# The Close price is usually the 5th column (index 4)
close_str = cols[4].get_text(strip=True).replace(',', '')
data.append({
'date': date_str,
'close': close_str
})
# 2. Convert to DataFrame
df = pd.DataFrame(data)
if df.empty:
return None
# 3. Final Type Conversion
df['date'] = pd.to_datetime(df['date'], errors='coerce')
df['close'] = pd.to_numeric(df['close'], errors='coerce')
return df.dropna().sort_values('date').reset_index(drop=True)
except Exception as e:
print(f"❌ Failed to parse FT HTML structure: {e}")
return None
def _parse_yahoo(self, json_data):
"""Parses Yahoo Finance v8 Chart JSON"""
try:
chart = json_data['chart']['result'][0]
timestamps = chart['timestamp']
indicators = chart['indicators']['quote'][0]
# Use adjclose if available, otherwise close
closes = indicators.get('close', [])
df = pd.DataFrame({
'date': pd.to_datetime(timestamps, unit='s'),
'close': closes
})
return df
except:
return None
def fetch_data(self):
local_df = pd.DataFrame()
CACHE_EXPIRY = 24 * 3600
file_exists = os.path.exists(self.file_path)
# 1. Load Local Cache & Check Age
needs_refresh = True
if file_exists:
try:
local_df = pd.read_csv(self.file_path)
local_df['date'] = pd.to_datetime(local_df['date'], errors='coerce')
file_age = time.time() - os.path.getmtime(self.file_path)
if file_age < CACHE_EXPIRY:
needs_refresh = False
print(f"🚀 Using Cache: {self.symbol} ({round(file_age/3600, 1)}h old).")
except Exception as e:
print(f"⚠️ Cache read error for {self.symbol}: {e}")
# 2. Network Fetch
if needs_refresh:
try:
if not self.url or str(self.url).lower() == 'none':
print(f"❌ No URL found for {self.symbol}.")
return local_df
print(f"📡 Syncing {self.symbol} from {self.provider}...")
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
response = requests.get(self.url, headers=headers, timeout=15)
if response.status_code == 200:
new_df = None
# --- PROVIDER ROUTING ---
# ROUTING TO PARSERS
if self.provider == 'yahoo':
new_df = self._parse_yahoo(response.json())
elif self.provider == 'jpm':
new_df = self._parse_jpm(response.json())
elif self.provider in ['agi', 'ft']:
new_df = self._parse_ft_html(response.text)
# --- MERGE & SAVE ---
if new_df is not None and not new_df.empty:
new_df['date'] = pd.to_datetime(new_df['date'], errors='coerce')
combined_df = pd.concat([local_df, new_df], ignore_index=True)
combined_df = combined_df.sort_values('date').drop_duplicates(subset=['date'], keep='last')
final_df = combined_df.dropna(subset=['date', 'close'])
final_df[['date', 'close']].to_csv(self.file_path, index=False)
print(f"{self.symbol} updated to {final_df['date'].max().date()}")
return final_df
else:
print(f"⚠️ Could not parse data for {self.symbol} via {self.provider}")
else:
print(f"⚠️ {self.provider} returned status {response.status_code}")
except Exception as e:
print(f"❌ Sync failed for {self.symbol}: {e}")
return local_df
def get_local_metrics(self):
"""Reads ONLY from local CSV and returns metrics immediately."""
if not os.path.exists(self.file_path):
return {"error": "Missing Local Data", "status": "needs_sync"}
try:
df = pd.read_csv(self.file_path)
# Ensure columns are clean
df.columns = [c.lower().strip() for c in df.columns]
df['date'] = pd.to_datetime(df['date'], errors='coerce')
df = df.dropna(subset=['date', 'close']).sort_values('date')
# Pass this local dataframe to your existing calculation function
return self.calculate_table_metrics(df)
except Exception as e:
print(f"Error reading local data for {self.symbol}: {e}")
return None
def calculate_table_metrics(self, df):
if df is None or df.empty or len(df) < 2:
return None
# Get the last row for price and date
last_row = df.iloc[-1]
last_close = float(last_row['close'])
# 1. Extract and format the date
# Handles both datetime objects and string dates
last_date = last_row['date']
if hasattr(last_date, 'strftime'):
formatted_date = last_date.strftime('%Y-%m-%d')
else:
formatted_date = str(last_date).split(' ')[0] # Fallback for strings
prev_close = float(df.iloc[-2]['close'])
change_pct = ((last_close - prev_close) / prev_close) * 100
count = len(df)
def get_ema_offset(window):
if count >= window:
from ta.trend import EMAIndicator
ema = EMAIndicator(close=df['close'], window=window).ema_indicator().iloc[-1]
return round(((last_close / ema) * 100) - 100, 1)
return "N/A"
k_val = d_val = "N/A"
if count >= 14:
from ta.momentum import StochasticOscillator
# Use high/low columns if they exist, otherwise fallback to close for Stoch
high_src = df['high'] if 'high' in df.columns else df['close']
low_src = df['low'] if 'low' in df.columns else df['close']
stoch = StochasticOscillator(high=high_src, low=low_src, close=df['close'], window=14)
k_val = round(stoch.stoch().iloc[-1], 0)
d_val = round(stoch.stoch_signal().iloc[-1], 0)
return {
"symbol": self.symbol,
"last_date": formatted_date, # <--- New field added
"last_close": round(last_close, 2),
"change_pct": round(change_pct, 2),
"low_52": round(float(df.tail(252)['close'].min()), 2),
"high_52": round(float(df.tail(252)['close'].max()), 2),
"last_ema20": get_ema_offset(20),
"last_ema50": get_ema_offset(50),
"last_ema100": get_ema_offset(100),
"last_ema200": get_ema_offset(200),
"kd_values": f"{int(k_val)}/{int(d_val)}" if k_val != "N/A" else "N/A"
}
class StrategyEngine:
"""
Handles financial strategy simulations and backtesting.
This class takes a DataEngine instance to access files.
"""
def __init__(self, data_engine):
# 1. Save the engine object (The 'Supplier')
self.data_engine = data_engine
# 2. Extract the symbol from the supplier so the chef knows the name
# We don't need .strip() here because DataEngine already did it!
self.symbol = data_engine.symbol
def _find_file(self):
# Try the uppercase version first
upper_path = os.path.join(self.data_dir, f"{self.symbol}.csv")
# Try the lowercase version second
lower_path = os.path.join(self.data_dir, f"{self.symbol.lower()}.csv")
if os.path.exists(upper_path):
return upper_path
elif os.path.exists(lower_path):
return lower_path
# If neither exists, print a very specific message to your terminal
print(f"ERROR: Searched for {upper_path} AND {lower_path} - Neither found!")
return None
def load_data(self):
df = pd.read_csv(self.file_path)
# Standardize column names to lowercase to avoid 'Price' vs 'price' issues
df.columns = [c.lower() for c in df.columns]
# Map common variations to a single 'price' column
if 'adj close' in df.columns:
df = df.rename(columns={'adj close': 'close'})
elif 'close' in df.columns:
df = df.rename(columns={'close': 'close'})
return df
def calculate_va_vs_dca(self, initial_inv, monthly_target, start_date, allow_sell=True, allow_fractional=True):
import math
# 1. Load and Prepare Data
df = pd.read_csv(self.data_engine.file_path)
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values('date')
# 2. Identify the "Anchor Day" and the "Absolute Latest Day"
start_dt_obj = pd.to_datetime(start_date)
anchor_day = start_dt_obj.day
latest_csv_date = df['date'].max() # This captures 2026-01-27
# --- 3. Filter data starting from your start_date ---
df_from_start = df[df['date'] >= start_dt_obj].copy()
if df_from_start.empty:
return []
# --- 4. Choose rows based on Frequency ---
if frequency == "Weekly":
# Take every 5th trading day
final_df = df_from_start.iloc[::5].copy()
step_increment = float((monthly_goal * 12) / 52)
elif frequency == "Bi-Weekly":
# Take every 10th trading day
final_df = df_from_start.iloc[::10].copy()
step_increment = float((monthly_goal * 12) / 26)
else: # Default to Monthly
# Group by year/month and take the first available day >= anchor_day
final_df = df_from_start[df_from_start['date'].dt.day >= anchor_day].groupby([
df_from_start['date'].dt.year,
df_from_start['date'].dt.month
], as_index=False).first()
step_increment = float(monthly_goal)
# --- 5. FORCE LAST ROW SAFETY ---
# Reset index to make concatenation clean
final_df = final_df.reset_index(drop=True)
if not df_from_start.empty:
latest_row = df_from_start.iloc[[-1]] # Gets the absolute last available day
if final_df.empty or latest_row.iloc[0]['date'] != final_df.iloc[-1]['date']:
final_df = pd.concat([final_df, latest_row]).drop_duplicates(subset=['date'])
# Sort and final index reset before the loop
final_df = final_df.sort_values('date').reset_index(drop=True)
if final_df.empty:
return []
# --- 6. Helper for share calculation ---
def get_shares(cash, prc):
if prc <= 0: return 0
# Note: allow_fractional should be defined in your class/scope
return cash / prc if allow_fractional else math.floor(cash / prc)
# --- 7. Initial Setup ---
va_shares = 0
dca_shares = 0
va_invested = 0
dca_invested = 0
va_target_value = 0
history = []
# --- 8. Strategy Loop ---
for step, (idx, row) in enumerate(final_df.iterrows()):
actual_date_str = row['date'].strftime('%Y-%m-%d')
price = float(row['close'])
if step == 0:
# --- STEP 0: INITIAL DEPOSIT ---
actual_inv = initial_inv
dca_actual_inv = initial_inv
va_target_value = initial_inv
va_new_shares = get_shares(actual_inv, price)
dca_new_shares = va_new_shares
else:
# --- STEP 1+: DVA vs DCA ---
# Use step_increment (calculated in your frequency logic)
# to ensure growth matches frequency (Weekly/Monthly)
current_increment = step_increment
# DCA Logic
dca_actual_inv = current_increment
dca_new_shares = get_shares(dca_actual_inv, price)
# DVA Logic (Fixed Value Path)
va_target_value += current_increment
# Gap calculation: Target vs. current value BEFORE this step's investment
current_va_val_pre = va_shares * price
diff = va_target_value - current_va_val_pre
# Apply Buy/Sell constraints
# note: allow_sell should be defined in your class/scope
actual_inv = diff if (diff >= 0 or allow_sell) else 0
va_new_shares = get_shares(actual_inv, price)
# --- STATE UPDATES ---
va_shares += va_new_shares
dca_shares += dca_new_shares
va_invested += actual_inv
dca_invested += dca_actual_inv
# --- Unified History Append ---
history.append({
"date": actual_date_str,
"price": round(price, 2),
"dca_value": round(dca_shares * price, 2),
"dca_invested": round(dca_invested, 2),
"dca_shares_trans": round(dca_new_shares, 4),
"dca_shares_total": round(dca_shares, 4),
"va_value": round(va_shares * price, 2),
"va_invested": round(va_invested, 2),
"va_diff": round(actual_inv, 2),
"va_shares_trans": round(va_new_shares, 4),
"va_shares_total": round(va_shares, 4),
"va_target_value": round(va_target_value, 2)
})
return history
def run_simulation(self, start_date, monthly_goal, initial_inv, frequency="Monthly", allow_sell=True, allow_fractional=True):
# 1. DATA LOADING
df = pd.read_csv(self.data_engine.file_path)
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values('date')
# 2. DATE PREP
start_dt_obj = pd.to_datetime(start_date)
latest_csv_date = df['date'].max()
# Filter data starting from user's choice
df_from_start = df[df['date'] >= start_dt_obj].copy()
if df_from_start.empty:
return []
# 3. FREQUENCY LOGIC (Hardened)
# Standardize string for comparison
freq_check = str(frequency).strip().title()
if freq_check == "Weekly":
final_df = df_from_start.iloc[::5].copy()
step_increment = float((monthly_goal * 12) / 52)
elif freq_check == "Bi-Weekly":
final_df = df_from_start.iloc[::10].copy()
step_increment = float((monthly_goal * 12) / 26)
else:
# For Monthly, we need the anchor day
anchor_day = start_dt_obj.day
final_df = df_from_start[df_from_start['date'].dt.day >= anchor_day].groupby([
df_from_start['date'].dt.year,
df_from_start['date'].dt.month
], as_index=False).first()
step_increment = float(monthly_goal)
# 4. SAFETY: Ensure most recent price is included
final_df = final_df.reset_index(drop=True)
last_actual_row = df_from_start.iloc[[-1]]
if final_df.empty or last_actual_row.iloc[0]['date'] != final_df.iloc[-1]['date']:
final_df = pd.concat([final_df, last_actual_row]).drop_duplicates(subset=['date'])
final_df = final_df.sort_values('date').reset_index(drop=True)
# 5. STRATEGY INITIALIZATION
def get_shares(cash, prc):
if prc <= 0: return 0
if allow_fractional:
return float(cash / prc)
else:
# Handles both buying (+) and selling (-) for whole shares
return float(math.floor(cash / prc)) if cash >= 0 else float(math.ceil(cash / prc))
# Ensure these are initialized as floats
va_shares, dca_shares = 0.0, 0.0
va_invested, dca_invested = 0.0, 0.0
history = []
for step, (idx, row) in enumerate(final_df.iterrows()):
actual_date_str = row['date'].strftime('%Y-%m-%d')
price = float(row['close'])
if step == 0:
# First row: Both strategies start with the Initial Investment
va_actual_inv = float(initial_inv)
dca_actual_inv = float(initial_inv)
va_target_value = float(initial_inv)
else:
# Subsequent rows: Use the frequency-adjusted step_increment
va_target_value += step_increment
# DCA logic: Always invests the same amount every period
dca_actual_inv = float(step_increment)
# VA logic: Invests enough to hit the target value
current_va_market_val = va_shares * price
diff = va_target_value - current_va_market_val
# Apply "Allow Sell" constraint
va_actual_inv = diff if (diff >= 0 or allow_sell) else 0.0
# Update Shares based on fractional setting
va_new_shares = get_shares(va_actual_inv, price)
dca_new_shares = get_shares(dca_actual_inv, price)
# Running totals for shares
va_shares += va_new_shares
dca_shares += dca_new_shares
# Running totals for principal invested
va_invested += va_actual_inv
dca_invested += dca_actual_inv
history.append({
"date": actual_date_str,
"price": round(price, 2),
"va_diff": round(va_actual_inv, 2), # Invested this step (VA)
"va_shares_trans": round(va_new_shares, 4),
"va_value": round(va_shares * price, 2), # Current Portfolio Value (VA)
"va_invested": round(va_invested, 2), # Total Out-of-Pocket (VA)
"va_shares_total": round(va_shares, 4),
"va_target_value": round(float(va_target_value), 2),
"dca_diff": round(dca_actual_inv, 2), # Invested this step (DCA)
"dca_shares_trans": round(dca_new_shares, 4),
"dca_value": round(dca_shares * price, 2), # Current Portfolio Value (DCA)
"dca_invested": round(dca_invested, 2), # Total Out-of-Pocket (DCA)
"dca_shares_total": round(dca_shares, 4)
})
return history