Files
historical-prices/engine.py
T

705 lines
29 KiB
Python

import pandas as pd
import requests
import os
import csv
import shutil
from datetime import datetime, time
import yfinance as yf
from ta.trend import EMAIndicator
from ta.momentum import StochasticOscillator
import math
class DataEngine:
def __init__(self, symbol=None, url=None, provider=None, data_dir='data_cache'):
# 1. Clean the incoming symbol
self.symbol = symbol.strip().upper() if symbol else None
# 2. Setup centralized paths
base_path = os.path.dirname(os.path.abspath(__file__))
self.cache_dir = os.path.join(base_path, data_dir)
os.makedirs(self.cache_dir, exist_ok=True)
# 3. Load the master instrument list to find URLs/Providers
# This ensures the engine knows where to go for special tickers
self.master_instruments = self.load_instruments_from_csv('instruments.csv')
# 4. Find config from master list or use passed-in arguments
instrument_config = next((i for i in self.master_instruments if i['symbol'] == self.symbol), None)
if instrument_config:
self.url = instrument_config['url']
self.provider = instrument_config['provider']
else:
# Fallback to arguments if ticker isn't in the CSV list
self.url = url
self.provider = provider or 'yahoo'
# 5. Define final file path for centralized storage
if self.symbol:
self.file_path = os.path.join(self.cache_dir, f"{self.symbol}.csv")
else:
self.file_path = None
self.ensure_data()
def ensure_data(self):
"""Checks if file exists; if not, downloads it."""
if os.path.exists(self.file_path):
return True # Data is already there
print(f"DEBUG: {self.symbol} not found in cache. Attempting download...")
try:
# For a generic ticker like SPY, we use yfinance
import yfinance as yf
df = yf.download(self.symbol, period="max")
if df.empty:
print(f"ERROR: No data found for {self.symbol}")
return False
# Clean and save
# 1. If columns are MultiIndex (tuples), take just the first level (the price name)
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
# 2. Reset index to turn 'Date' into a column
df.reset_index(inplace=True)
# 3. Now it is safe to lowercase the column names
df.columns = [str(c).lower() for c in df.columns]
df.to_csv(self.file_path, index=False)
print(f"DEBUG: Successfully cached {self.symbol}")
return True
except Exception as e:
print(f"ERROR: Download failed for {self.symbol}: {e}")
return False
def load_instruments_from_csv(self, file_path):
instruments = []
TEMPLATES = {
'jpm': "https://am.jpmorgan.com/FundsMarketingHandler/historicalData?cusip={cusip}&country=hk&role=per",
'yahoo': "https://query1.finance.yahoo.com/v8/finance/chart/{cusip}?period1=0&period2=9999999999&interval=1d&events=history",
'agi': "https://markets.ft.com/data/funds/tearsheet/historical?s={cusip}"
}
try:
# Get absolute path relative to this script
abs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_path)
if not os.path.exists(abs_path):
print(f"Error: Master list {file_path} not found at {abs_path}")
return []
with open(abs_path, mode='r', encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)
# Clean header names (lowercase + remove whitespace)
reader.fieldnames = [name.strip().lower() for name in reader.fieldnames]
for row in reader:
# Use .get() with fallback to avoid KeyErrors
symbol = (row.get('symbol') or '').strip().upper()
cusip = (row.get('cusip') or '').strip()
provider = (row.get('provider') or 'jpm').strip().lower()
if symbol and cusip:
template = TEMPLATES.get(provider, TEMPLATES['jpm'])
url = template.format(cusip=cusip)
instruments.append({
"symbol": symbol,
"url": url,
"provider": provider,
"cusip": cusip
})
except Exception as e:
print(f"CRITICAL: Failed to load instruments.csv: {e}")
return instruments
def _ensure_data_exists(self):
if not os.path.exists(self.file_path):
# Check if this symbol exists in our master CSV mapping
match = next((i for i in self.instruments if i['symbol'].upper() == self.symbol), None)
if match:
print(f"DEBUG: Found {self.symbol} in master list. Fetching from {match['provider']}...")
self._download_from_provider(match)
else:
print(f"DEBUG: {self.symbol} not in master list. Trying generic Yahoo Finance...")
self._download_generic_yahoo()
def _download_generic_yahoo(self):
"""Standard yfinance fallback"""
try:
df = yf.download(self.symbol, period="max")
if not df.empty:
df.reset_index(inplace=True)
df.columns = [c.lower() for c in df.columns]
df.to_csv(self.file_path, index=False)
except Exception as e:
print(f"Yahoo fallback failed: {e}")
def global_sync(self):
"""Backup, Sync all instruments, and return a summary report."""
# 1. Run Maintenance/Backup
self.run_pre_sync_maintenance()
# FIX 1: Add 'self.' so it calls the method inside this class
instruments = self.load_instruments_from_csv('instruments.csv')
report = {
"total": len(instruments),
"updated": 0,
"failed": 0,
"details": []
}
for item in instruments:
try:
self.symbol = item['symbol']
self.provider = item['provider']
self.url = item['url']
# FIX 2: Use 'self.cache_dir' to match your __init__ logic
self.file_path = os.path.join(self.cache_dir, f"{self.symbol}.csv")
print(f"Updating {self.symbol}...")
# fetch_data now returns the updated DataFrame or None
result_df = self.fetch_data()
time.sleep(1)
if result_df is not None and not result_df.empty:
report["updated"] += 1
last_price = result_df['close'].iloc[-1]
report["details"].append(f"{self.symbol}: Updated (Price: {last_price})")
else:
report["failed"] += 1
report["details"].append(f"{self.symbol}: No new data found")
except Exception as e:
report["failed"] += 1
report["details"].append(f"⚠️ {self.symbol}: Error ({str(e)})")
return report
def run_pre_sync_maintenance(self):
"""Backs up files and reports current data health."""
import os
import shutil
import pandas as pd
from datetime import datetime
# 1. Setup paths correctly
base_dir = os.path.dirname(os.path.abspath(__file__))
backup_dir = os.path.join(base_dir, 'backups')
# 2. Create the timestamped folder path FIRST
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
current_backup_path = os.path.join(backup_dir, f"sync_backup_{timestamp}")
# 3. Create the directories (safety-first)
os.makedirs(current_backup_path, exist_ok=True)
print(f"\n--- Pre-Sync Health Check ({timestamp}) ---")
stats = []
# 4. Check if cache exists to avoid errors
if not os.path.exists(self.cache_dir):
print(f"⚠️ Cache directory not found at {self.cache_dir}")
return pd.DataFrame()
# 5. Backup loop
for filename in os.listdir(self.cache_dir):
if filename.endswith(".csv"):
src = os.path.join(self.cache_dir, filename)
dst = os.path.join(current_backup_path, filename)
try:
# Perform copy
shutil.copy2(src, dst)
# Read data for health check
df = pd.read_csv(src)
# Store stats
stats.append({
"Fund": filename.replace(".csv", ""),
"Rows": len(df),
"Start": df['date'].min() if 'date' in df.columns else "N/A",
"End": df['date'].max() if 'date' in df.columns else "N/A"
})
print(f"📦 Backed up: {filename} ({len(df)} rows)")
except Exception as e:
print(f"⚠️ Could not backup {filename}: {e}")
continue
# 6. Display and return report
if stats:
stats_df = pd.DataFrame(stats)
print("\n" + stats_df.to_string(index=False))
print(f"\n✅ All backups saved to: {current_backup_path}")
return stats_df
else:
print("📭 No CSV files found to backup.")
return pd.DataFrame()
def _parse_jpm(self, json_data):
if isinstance(json_data, dict) and "historicalNAVList" in json_data:
df = pd.DataFrame(json_data["historicalNAVList"])
return df.rename(columns={'navPrice': 'close', 'date': 'date'})
return None
def _parse_ft_html(self, html_text):
try:
# 1. Use BeautifulSoup to handle the nested spans in the Date column
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_text, 'html.parser')
# Find the specific results table
table = soup.find('table', class_='mod-tearsheet-historical-prices__results')
if not table:
print(f"❌ Could not find the results table in the HTML for {self.symbol}")
return None
data = []
rows = table.find('tbody').find_all('tr')
for row in rows:
cols = row.find_all('td')
if len(cols) >= 5:
# The Date cell has two spans. We'll take the first one (Full date).
date_cell = cols[0].find('span', class_='mod-ui-hide-small-below')
date_str = date_cell.get_text(strip=True) if date_cell else cols[0].get_text(strip=True)
# The Close price is usually the 5th column (index 4)
close_str = cols[4].get_text(strip=True).replace(',', '')
data.append({
'date': date_str,
'close': close_str
})
# 2. Convert to DataFrame
df = pd.DataFrame(data)
if df.empty:
return None
# 3. Final Type Conversion
df['date'] = pd.to_datetime(df['date'], errors='coerce')
df['close'] = pd.to_numeric(df['close'], errors='coerce')
return df.dropna().sort_values('date').reset_index(drop=True)
except Exception as e:
print(f"❌ Failed to parse FT HTML structure: {e}")
return None
def _parse_yahoo(self, json_data):
"""Parses Yahoo Finance v8 Chart JSON"""
try:
chart = json_data['chart']['result'][0]
timestamps = chart['timestamp']
indicators = chart['indicators']['quote'][0]
# Use adjclose if available, otherwise close
closes = indicators.get('close', [])
df = pd.DataFrame({
'date': pd.to_datetime(timestamps, unit='s'),
'close': closes
})
return df
except:
return None
def fetch_data(self):
local_df = pd.DataFrame()
new_df = None
# 1. Load Local Cache & Force Date Type
if os.path.exists(self.file_path):
try:
local_df = pd.read_csv(self.file_path)
local_df = local_df.loc[:, ~local_df.columns.duplicated()].copy()
local_df.columns = [c.lower().strip() for c in local_df.columns]
local_df = local_df.rename(columns={'price': 'close', 'nav': 'close'})
# FORCE CONVERSION: This fixes the '<' error
# errors='coerce' turns bad text into NaT (Not a Time), which we then drop
local_df['date'] = pd.to_datetime(local_df['date'], errors='coerce')
local_df = local_df.dropna(subset=['date']).reset_index(drop=True)
except Exception as e:
print(f"Local Load Error: {e}")
# 2. Network Fetch
try:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
response = requests.get(self.url, headers=headers, timeout=15)
response.raise_for_status()
if self.provider == 'agi':
new_df = self._parse_ft_html(response.text)
elif self.provider == 'jpm':
new_df = self._parse_jpm(response.json())
elif self.provider == 'yahoo':
new_df = self._parse_yahoo(response.json())
# 3. Safe Merge & Sort
if new_df is not None and not new_df.empty:
# Force new_df dates to match local_df format
new_df['date'] = pd.to_datetime(new_df['date'], errors='coerce')
combined_df = pd.concat([local_df, new_df], ignore_index=True)
combined_df = combined_df.drop_duplicates(subset=['date'], keep='last')
# SORTING: Now safe because all types are Timestamps
combined_df = combined_df.sort_values('date').reset_index(drop=True)
if 'close' in combined_df.columns:
final_df = combined_df[['date', 'close']].dropna()
final_df.to_csv(self.file_path, index=False)
return final_df
return local_df
except Exception as e:
print(f"Network error for {self.symbol}: {e}")
return local_df
def get_local_metrics(self):
"""Reads ONLY from local CSV and returns metrics immediately."""
if not os.path.exists(self.file_path):
return {"error": "Missing Local Data", "status": "needs_sync"}
try:
df = pd.read_csv(self.file_path)
# Ensure columns are clean
df.columns = [c.lower().strip() for c in df.columns]
df['date'] = pd.to_datetime(df['date'], errors='coerce')
df = df.dropna(subset=['date', 'close']).sort_values('date')
# Pass this local dataframe to your existing calculation function
return self.calculate_table_metrics(df)
except Exception as e:
print(f"Error reading local data for {self.symbol}: {e}")
return None
def calculate_table_metrics(self, df):
if df is None or df.empty or len(df) < 2:
return None
last_close = float(df.iloc[-1]['close'])
prev_close = float(df.iloc[-2]['close'])
change_pct = ((last_close - prev_close) / prev_close) * 100
count = len(df)
def get_ema_offset(window):
if count >= window:
ema = EMAIndicator(close=df['close'], window=window).ema_indicator().iloc[-1]
return round(((last_close / ema) * 100) - 100, 1)
return "N/A"
k_val = d_val = "N/A"
if count >= 14:
high_14 = df['close'].rolling(window=14).max()
low_14 = df['close'].rolling(window=14).min()
stoch = StochasticOscillator(high=high_14, low=low_14, close=df['close'], window=14)
k_val = round(stoch.stoch().iloc[-1], 0)
d_val = round(stoch.stoch_signal().iloc[-1], 0)
return {
"last_close": round(last_close, 2),
"change_pct": round(change_pct, 2),
"low_52": round(float(df.tail(252)['close'].min()), 2),
"high_52": round(float(df.tail(252)['close'].max()), 2),
"last_ema20": get_ema_offset(20),
"last_ema50": get_ema_offset(50),
"last_ema100": get_ema_offset(100),
"last_ema200": get_ema_offset(200),
"kd_values": f"{k_val}/{d_val}" if k_val != "N/A" else "N/A"
}
class StrategyEngine:
"""
Handles financial strategy simulations and backtesting.
This class takes a DataEngine instance to access files.
"""
def __init__(self, data_engine):
# 1. Save the engine object (The 'Supplier')
self.data_engine = data_engine
# 2. Extract the symbol from the supplier so the chef knows the name
# We don't need .strip() here because DataEngine already did it!
self.symbol = data_engine.symbol
def _find_file(self):
# Try the uppercase version first
upper_path = os.path.join(self.data_dir, f"{self.symbol}.csv")
# Try the lowercase version second
lower_path = os.path.join(self.data_dir, f"{self.symbol.lower()}.csv")
if os.path.exists(upper_path):
return upper_path
elif os.path.exists(lower_path):
return lower_path
# If neither exists, print a very specific message to your terminal
print(f"ERROR: Searched for {upper_path} AND {lower_path} - Neither found!")
return None
def load_data(self):
df = pd.read_csv(self.file_path)
# Standardize column names to lowercase to avoid 'Price' vs 'price' issues
df.columns = [c.lower() for c in df.columns]
# Map common variations to a single 'price' column
if 'adj close' in df.columns:
df = df.rename(columns={'adj close': 'close'})
elif 'close' in df.columns:
df = df.rename(columns={'close': 'close'})
return df
def calculate_va_vs_dca(self, initial_inv, monthly_target, start_date, allow_sell=True, allow_fractional=True):
import math
# 1. Load and Prepare Data
df = pd.read_csv(self.data_engine.file_path)
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values('date')
# 2. Identify the "Anchor Day" and the "Absolute Latest Day"
start_dt_obj = pd.to_datetime(start_date)
anchor_day = start_dt_obj.day
latest_csv_date = df['date'].max() # This captures 2026-01-27
# --- 3. Filter data starting from your start_date ---
df_from_start = df[df['date'] >= start_dt_obj].copy()
if df_from_start.empty:
return []
# --- 4. Choose rows based on Frequency ---
if frequency == "Weekly":
# Take every 5th trading day
final_df = df_from_start.iloc[::5].copy()
step_increment = float((monthly_goal * 12) / 52)
elif frequency == "Bi-Weekly":
# Take every 10th trading day
final_df = df_from_start.iloc[::10].copy()
step_increment = float((monthly_goal * 12) / 26)
else: # Default to Monthly
# Group by year/month and take the first available day >= anchor_day
final_df = df_from_start[df_from_start['date'].dt.day >= anchor_day].groupby([
df_from_start['date'].dt.year,
df_from_start['date'].dt.month
], as_index=False).first()
step_increment = float(monthly_goal)
# --- 5. FORCE LAST ROW SAFETY ---
# Reset index to make concatenation clean
final_df = final_df.reset_index(drop=True)
if not df_from_start.empty:
latest_row = df_from_start.iloc[[-1]] # Gets the absolute last available day
if final_df.empty or latest_row.iloc[0]['date'] != final_df.iloc[-1]['date']:
final_df = pd.concat([final_df, latest_row]).drop_duplicates(subset=['date'])
# Sort and final index reset before the loop
final_df = final_df.sort_values('date').reset_index(drop=True)
if final_df.empty:
return []
# --- 6. Helper for share calculation ---
def get_shares(cash, prc):
if prc <= 0: return 0
# Note: allow_fractional should be defined in your class/scope
return cash / prc if allow_fractional else math.floor(cash / prc)
# --- 7. Initial Setup ---
va_shares = 0
dca_shares = 0
va_invested = 0
dca_invested = 0
va_target_value = 0
history = []
# --- 8. Strategy Loop ---
for step, (idx, row) in enumerate(final_df.iterrows()):
actual_date_str = row['date'].strftime('%Y-%m-%d')
price = float(row['close'])
if step == 0:
# --- STEP 0: INITIAL DEPOSIT ---
actual_inv = initial_inv
dca_actual_inv = initial_inv
va_target_value = initial_inv
va_new_shares = get_shares(actual_inv, price)
dca_new_shares = va_new_shares
else:
# --- STEP 1+: DVA vs DCA ---
# Use step_increment (calculated in your frequency logic)
# to ensure growth matches frequency (Weekly/Monthly)
current_increment = step_increment
# DCA Logic
dca_actual_inv = current_increment
dca_new_shares = get_shares(dca_actual_inv, price)
# DVA Logic (Fixed Value Path)
va_target_value += current_increment
# Gap calculation: Target vs. current value BEFORE this step's investment
current_va_val_pre = va_shares * price
diff = va_target_value - current_va_val_pre
# Apply Buy/Sell constraints
# note: allow_sell should be defined in your class/scope
actual_inv = diff if (diff >= 0 or allow_sell) else 0
va_new_shares = get_shares(actual_inv, price)
# --- STATE UPDATES ---
va_shares += va_new_shares
dca_shares += dca_new_shares
va_invested += actual_inv
dca_invested += dca_actual_inv
# --- Unified History Append ---
history.append({
"date": actual_date_str,
"price": round(price, 2),
"dca_value": round(dca_shares * price, 2),
"dca_invested": round(dca_invested, 2),
"dca_shares_trans": round(dca_new_shares, 4),
"dca_shares_total": round(dca_shares, 4),
"va_value": round(va_shares * price, 2),
"va_invested": round(va_invested, 2),
"va_diff": round(actual_inv, 2),
"va_shares_trans": round(va_new_shares, 4),
"va_shares_total": round(va_shares, 4),
"va_target_value": round(va_target_value, 2)
})
return history
def run_simulation(self, start_date, monthly_goal, initial_inv, frequency="Monthly", allow_sell=True, allow_fractional=True):
# 1. DATA LOADING
df = pd.read_csv(self.data_engine.file_path)
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values('date')
# 2. DATE PREP
start_dt_obj = pd.to_datetime(start_date)
latest_csv_date = df['date'].max()
# Filter data starting from user's choice
df_from_start = df[df['date'] >= start_dt_obj].copy()
if df_from_start.empty:
return []
# 3. FREQUENCY LOGIC (Hardened)
# Standardize string for comparison
freq_check = str(frequency).strip().title()
if freq_check == "Weekly":
final_df = df_from_start.iloc[::5].copy()
step_increment = float((monthly_goal * 12) / 52)
elif freq_check == "Bi-Weekly":
final_df = df_from_start.iloc[::10].copy()
step_increment = float((monthly_goal * 12) / 26)
else:
# For Monthly, we need the anchor day
anchor_day = start_dt_obj.day
final_df = df_from_start[df_from_start['date'].dt.day >= anchor_day].groupby([
df_from_start['date'].dt.year,
df_from_start['date'].dt.month
], as_index=False).first()
step_increment = float(monthly_goal)
# 4. SAFETY: Ensure most recent price is included
final_df = final_df.reset_index(drop=True)
last_actual_row = df_from_start.iloc[[-1]]
if final_df.empty or last_actual_row.iloc[0]['date'] != final_df.iloc[-1]['date']:
final_df = pd.concat([final_df, last_actual_row]).drop_duplicates(subset=['date'])
final_df = final_df.sort_values('date').reset_index(drop=True)
# 5. STRATEGY INITIALIZATION
def get_shares(cash, prc):
if prc <= 0: return 0
if allow_fractional:
return float(cash / prc)
else:
# Handles both buying (+) and selling (-) for whole shares
return float(math.floor(cash / prc)) if cash >= 0 else float(math.ceil(cash / prc))
# Ensure these are initialized as floats
va_shares, dca_shares = 0.0, 0.0
va_invested, dca_invested = 0.0, 0.0
history = []
for step, (idx, row) in enumerate(final_df.iterrows()):
actual_date_str = row['date'].strftime('%Y-%m-%d')
price = float(row['close'])
if step == 0:
# First row: Both strategies start with the Initial Investment
va_actual_inv = float(initial_inv)
dca_actual_inv = float(initial_inv)
va_target_value = float(initial_inv)
else:
# Subsequent rows: Use the frequency-adjusted step_increment
va_target_value += step_increment
# DCA logic: Always invests the same amount every period
dca_actual_inv = float(step_increment)
# VA logic: Invests enough to hit the target value
current_va_market_val = va_shares * price
diff = va_target_value - current_va_market_val
# Apply "Allow Sell" constraint
va_actual_inv = diff if (diff >= 0 or allow_sell) else 0.0
# Update Shares based on fractional setting
va_new_shares = get_shares(va_actual_inv, price)
dca_new_shares = get_shares(dca_actual_inv, price)
# Running totals for shares
va_shares += va_new_shares
dca_shares += dca_new_shares
# Running totals for principal invested
va_invested += va_actual_inv
dca_invested += dca_actual_inv
history.append({
"date": actual_date_str,
"price": round(price, 2),
"va_diff": round(va_actual_inv, 2), # Invested this step (VA)
"va_shares_trans": round(va_new_shares, 4),
"va_value": round(va_shares * price, 2), # Current Portfolio Value (VA)
"va_invested": round(va_invested, 2), # Total Out-of-Pocket (VA)
"va_shares_total": round(va_shares, 4),
"va_target_value": round(float(va_target_value), 2),
"dca_diff": round(dca_actual_inv, 2), # Invested this step (DCA)
"dca_shares_trans": round(dca_new_shares, 4),
"dca_value": round(dca_shares * price, 2), # Current Portfolio Value (DCA)
"dca_invested": round(dca_invested, 2), # Total Out-of-Pocket (DCA)
"dca_shares_total": round(dca_shares, 4)
})
return history