import pandas as pd import requests import os import csv import shutil import time from datetime import datetime import yfinance as yf from ta.trend import EMAIndicator from ta.momentum import StochasticOscillator import math class DataEngine: def __init__(self, symbol=None, name=None, url=None, provider=None, data_dir='data_cache'): # 1. Basics first self.symbol = symbol.strip().upper() if symbol else None self.name = name # --- Fetch frequently --- self.cache_expiry = 1 * 3600 # Set to 0 for forced refresh, 24 for normal # 2. Setup the directory variable FIRST # (This was likely below the file_path line, causing the crash) base_path = os.path.dirname(os.path.abspath(__file__)) self.cache_dir = os.path.join(base_path, data_dir) os.makedirs(self.cache_dir, exist_ok=True) # 3. Now it is safe to define file_path because cache_dir exists self.file_path = os.path.join(self.cache_dir, f"{self.symbol}.csv") if self.symbol else None # 4. Load instruments and resolve the rest self.master_instruments = self.load_instruments_from_csv('instruments.csv') # 4. Resolve Config: Find this symbol in your CSV config = next((i for i in self.master_instruments if i['symbol'].upper() == self.symbol), None) if config: self.provider = config.get('provider', 'yahoo').lower() # BUILD URL DYNAMICALLY based on Symbol + Provider if self.provider == 'jpm': self.url = f"https://am.jpmorgan.com/FundsMarketingHandler/historicalData?cusip={self.symbol}&country=hk&role=per" elif self.provider == 'ft': self.url = f"https://markets.ft.com/data/funds/tearsheet/historical?s={self.symbol}" else: self.url = f"https://query1.finance.yahoo.com/v8/finance/chart/{self.symbol}?interval=1d&range=5y" else: # Fallback for symbols not in your CSV self.url = f"https://query1.finance.yahoo.com/v8/finance/chart/{self.symbol}?interval=1d&range=5y" if self.symbol else None self.provider = 'yahoo' # 5. Define file path and auto-sync self.file_path = os.path.join(self.cache_dir, f"{self.symbol}.csv") if self.symbol else None def ensure_data(self): """Checks if file exists and is fresh (less than 24h old).""" # Force expiry to 0 CACHE_EXPIRY = 0 if os.path.exists(self.file_path): # By changing this to 'if False', we force it to ignore the cache every time if False: # file_age < CACHE_EXPIRY: return True else: print(f"DEBUG: {self.symbol} refreshing now...") # This calls your actual downloader return self.fetch_data() def load_instruments_from_csv(self, file_path='instruments.csv'): instruments = [] # Templates use {id} as a generic placeholder TEMPLATES = { 'jpm': "https://am.jpmorgan.com/FundsMarketingHandler/historicalData?cusip={id}&country=hk&role=per", 'yahoo': "https://query1.finance.yahoo.com/v8/finance/chart/{id}?period1=0&period2=9999999999&interval=1d&events=history", 'ft': "https://markets.ft.com/data/funds/tearsheet/historical?s={id}", 'agi': "https://markets.ft.com/data/funds/tearsheet/historical?s={id}" } try: abs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_path) if not os.path.exists(abs_path): return [] with open(abs_path, mode='r', encoding='utf-8-sig') as csvfile: reader = csv.DictReader(csvfile) reader.fieldnames = [n.strip().lower() for n in reader.fieldnames] for row in reader: # Use whatever identifier is available uid = (row.get('cusip') or row.get('symbol') or row.get('ticker') or '').strip() symbol = (row.get('symbol') or row.get('ticker') or row.get('cusip') or '').strip().upper() provider = (row.get('provider') or 'yahoo').strip().lower() if symbol and uid: template = TEMPLATES.get(provider, TEMPLATES['yahoo']) instruments.append({ "symbol": symbol, "url": template.format(id=uid), "provider": provider, "name": row.get('name', symbol) }) except Exception as e: print(f"❌ Critical Load Error: {e}") return instruments def _ensure_data_exists(self): if not os.path.exists(self.file_path): # Check if this symbol exists in our master CSV mapping match = next((i for i in self.instruments if i['symbol'].upper() == self.symbol), None) if match: print(f"DEBUG: Found {self.symbol} in master list. Fetching from {match['provider']}...") self._download_from_provider(match) else: print(f"DEBUG: {self.symbol} not in master list. Trying generic Yahoo Finance...") self._download_generic_yahoo() def _download_generic_yahoo(self): """Standard yfinance fallback""" try: df = yf.download(self.symbol, period="max") if not df.empty: df.reset_index(inplace=True) df.columns = [c.lower() for c in df.columns] df.to_csv(self.file_path, index=False) except Exception as e: print(f"Yahoo fallback failed: {e}") def global_sync(self): """Backup, Sync all instruments, and return a summary report.""" # 1. Run Maintenance/Backup self.run_pre_sync_maintenance() # FIX 1: Add 'self.' so it calls the method inside this class instruments = self.load_instruments_from_csv('instruments.csv') report = { "total": len(instruments), "updated": 0, "failed": 0, "details": [] } for item in instruments: try: self.symbol = item['symbol'] self.provider = item['provider'] if self.provider == 'jpm': self.url = f"https://am.jpmorgan.com/FundsMarketingHandler/historicalData?cusip={self.symbol}&country=hk&role=per" elif self.provider == 'ft': self.url = f"https://markets.ft.com/data/funds/tearsheet/historical?s={self.symbol}" else: self.url = f"https://query1.finance.yahoo.com/v8/finance/chart/{self.symbol}?interval=1d&range=5y" # FIX 2: Use 'self.cache_dir' to match your __init__ logic self.file_path = os.path.join(self.cache_dir, f"{self.symbol}.csv") print(f"Updating {self.symbol}...") # fetch_data now returns the updated DataFrame or None result_df = self.fetch_data() time.sleep(1) if result_df is not None and not result_df.empty: report["updated"] += 1 last_price = result_df['close'].iloc[-1] report["details"].append(f"✅ {self.symbol}: Updated (Price: {last_price})") else: report["failed"] += 1 report["details"].append(f"❌ {self.symbol}: No new data found") except Exception as e: report["failed"] += 1 report["details"].append(f"⚠️ {self.symbol}: Error ({str(e)})") return report def run_pre_sync_maintenance(self): """Backs up files and keeps only the 5 most recent backup folders.""" base_dir = os.path.dirname(os.path.abspath(__file__)) backup_root = os.path.join(base_dir, 'backups') # 1. Create timestamped folder timestamp = datetime.now().strftime("%Y%m%d_%H%M") current_backup_path = os.path.join(backup_root, f"sync_backup_{timestamp}") os.makedirs(current_backup_path, exist_ok=True) # 2. Perform Backup if os.path.exists(self.cache_dir): files = [f for f in os.listdir(self.cache_dir) if f.endswith('.csv')] for filename in files: shutil.copy2( os.path.join(self.cache_dir, filename), os.path.join(current_backup_path, filename) ) print(f"✅ Backed up {len(files)} files to {current_backup_path}") # 3. Cleanup: Keep only last 5 backup folders all_backups = sorted([ os.path.join(backup_root, d) for d in os.listdir(backup_root) if os.path.isdir(os.path.join(backup_root, d)) ], key=os.path.getmtime) while len(all_backups) > 5: oldest = all_backups.pop(0) shutil.rmtree(oldest) print(f"🧹 Storage Cleanup: Removed old backup {os.path.basename(oldest)}") def _parse_jpm(self, json_data): if isinstance(json_data, dict) and "historicalNAVList" in json_data: df = pd.DataFrame(json_data["historicalNAVList"]) return df.rename(columns={'navPrice': 'close', 'date': 'date'}) return None def _parse_ft_html(self, html_text): try: # 1. Use BeautifulSoup to handle the nested spans in the Date column from bs4 import BeautifulSoup soup = BeautifulSoup(html_text, 'html.parser') # Find the specific results table table = soup.find('table', class_='mod-tearsheet-historical-prices__results') if not table: print(f"❌ Could not find the results table in the HTML for {self.symbol}") return None data = [] rows = table.find('tbody').find_all('tr') for row in rows: cols = row.find_all('td') if len(cols) >= 5: # The Date cell has two spans. We'll take the first one (Full date). date_cell = cols[0].find('span', class_='mod-ui-hide-small-below') date_str = date_cell.get_text(strip=True) if date_cell else cols[0].get_text(strip=True) # The Close price is usually the 5th column (index 4) close_str = cols[4].get_text(strip=True).replace(',', '') data.append({ 'date': date_str, 'close': close_str }) # 2. Convert to DataFrame df = pd.DataFrame(data) if df.empty: return None # 3. Final Type Conversion df['date'] = pd.to_datetime(df['date'], errors='coerce') df['close'] = pd.to_numeric(df['close'], errors='coerce') return df.dropna().sort_values('date').reset_index(drop=True) except Exception as e: print(f"❌ Failed to parse FT HTML structure: {e}") return None def _parse_yahoo(self, json_data): """Parses Yahoo Finance v8 Chart JSON""" try: chart = json_data['chart']['result'][0] timestamps = chart['timestamp'] indicators = chart['indicators']['quote'][0] # Use adjclose if available, otherwise close closes = indicators.get('close', []) df = pd.DataFrame({ 'date': pd.to_datetime(timestamps, unit='s'), 'close': closes }) return df except: return None def fetch_data(self): local_df = pd.DataFrame() CACHE_EXPIRY = 0 file_exists = os.path.exists(self.file_path) # 1. Load Local Cache & Check Age needs_refresh = True if file_exists: try: local_df = pd.read_csv(self.file_path) local_df['date'] = pd.to_datetime(local_df['date'], errors='coerce') file_age = time.time() - os.path.getmtime(self.file_path) if file_age < CACHE_EXPIRY: needs_refresh = False print(f"🚀 Using Cache: {self.symbol} ({round(file_age/3600, 1)}h old).") except Exception as e: print(f"⚠️ Cache read error for {self.symbol}: {e}") # 2. Network Fetch if needs_refresh: try: if not self.url or str(self.url).lower() == 'none': print(f"❌ No URL found for {self.symbol}.") return local_df print(f"📡 Syncing {self.symbol} from {self.provider}...") headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'} response = requests.get(self.url, headers=headers, timeout=15) if response.status_code == 200: new_df = None # --- PROVIDER ROUTING --- # ROUTING TO PARSERS if self.provider == 'yahoo': new_df = self._parse_yahoo(response.json()) elif self.provider == 'jpm': new_df = self._parse_jpm(response.json()) elif self.provider in ['agi', 'ft']: new_df = self._parse_ft_html(response.text) # --- MERGE & SAVE --- if new_df is not None and not new_df.empty: new_df['date'] = pd.to_datetime(new_df['date'], errors='coerce') combined_df = pd.concat([local_df, new_df], ignore_index=True) combined_df = combined_df.sort_values('date').drop_duplicates(subset=['date'], keep='last') final_df = combined_df.dropna(subset=['date', 'close']) final_df[['date', 'close']].to_csv(self.file_path, index=False) print(f"✅ {self.symbol} updated to {final_df['date'].max().date()}") return final_df else: print(f"⚠️ Could not parse data for {self.symbol} via {self.provider}") else: print(f"⚠️ {self.provider} returned status {response.status_code}") except Exception as e: print(f"❌ Sync failed for {self.symbol}: {e}") return local_df def get_local_metrics(self): """Reads ONLY from local CSV and returns metrics immediately.""" if not os.path.exists(self.file_path): return {"error": "Missing Local Data", "status": "needs_sync"} try: df = pd.read_csv(self.file_path) # Ensure columns are clean df.columns = [c.lower().strip() for c in df.columns] df['date'] = pd.to_datetime(df['date'], errors='coerce') df = df.dropna(subset=['date', 'close']).sort_values('date') # Pass this local dataframe to your existing calculation function return self.calculate_table_metrics(df) except Exception as e: print(f"Error reading local data for {self.symbol}: {e}") return None def calculate_table_metrics(self, df): if df is None or df.empty or len(df) < 2: return None # Get the last row for price and date last_row = df.iloc[-1] last_close = float(last_row['close']) # 1. Extract and format the date # Handles both datetime objects and string dates last_date = last_row['date'] if hasattr(last_date, 'strftime'): formatted_date = last_date.strftime('%Y-%m-%d') else: formatted_date = str(last_date).split(' ')[0] # Fallback for strings prev_close = float(df.iloc[-2]['close']) change_pct = ((last_close - prev_close) / prev_close) * 100 count = len(df) # --- NEW INDICATORS START --- # Bollinger Bands %B (20-day) bb_pct = "N/A" if count >= 20: from ta.volatility import BollingerBands indicator_bb = BollingerBands(close=df['close'], window=20, window_dev=2) m_avg = indicator_bb.bollinger_mavg().iloc[-1] h_band = indicator_bb.bollinger_hband().iloc[-1] l_band = indicator_bb.bollinger_lband().iloc[-1] # Calculate %B: where is price relative to bands? if (h_band - l_band) != 0: bb_pct = round((last_close - l_band) / (h_band - l_band), 2) # RSI (14-day) rsi_val = "N/A" if count >= 14: from ta.momentum import RSIIndicator rsi_val = round(RSIIndicator(close=df['close'], window=14).rsi().iloc[-1], 1) # Z-Score Helper Function def get_z_score(window): if count >= window: rolling_mean = df['close'].rolling(window=window).mean() rolling_std = df['close'].rolling(window=window).std() z = (last_close - rolling_mean.iloc[-1]) / rolling_std.iloc[-1] return round(z, 2) return "N/A" z60 = get_z_score(60) z120 = get_z_score(120) # Existing EMA Logic (Distance % from EMA) def get_ema_offset(window): if count >= window: from ta.trend import EMAIndicator ema = EMAIndicator(close=df['close'], window=window).ema_indicator().iloc[-1] return round(((last_close / ema) * 100) - 100, 1) return "N/A" k_val = d_val = "N/A" if count >= 14: from ta.momentum import StochasticOscillator # Use high/low columns if they exist, otherwise fallback to close for Stoch high_src = df['high'] if 'high' in df.columns else df['close'] low_src = df['low'] if 'low' in df.columns else df['close'] stoch = StochasticOscillator(high=high_src, low=low_src, close=df['close'], window=14) k_val = round(stoch.stoch().iloc[-1], 0) d_val = round(stoch.stoch_signal().iloc[-1], 0) return { "symbol": self.symbol, "last_date": formatted_date, "last_close": round(last_close, 2), "change_pct": round(change_pct, 2), "low_52": round(float(df.tail(252)['close'].min()), 2), "high_52": round(float(df.tail(252)['close'].max()), 2), "bb_pct": bb_pct, "rsi": rsi_val, "z60": z60, "z120": z120, "last_ema20": get_ema_offset(20), "last_ema50": get_ema_offset(50), "last_ema100": get_ema_offset(100), "last_ema200": get_ema_offset(200), "kd_values": f"{int(k_val)}/{int(d_val)}" if k_val != "N/A" else "N/A" } class StrategyEngine: """ Handles financial strategy simulations and backtesting. This class takes a DataEngine instance to access files. """ def __init__(self, data_engine): # 1. Save the engine object (The 'Supplier') self.data_engine = data_engine # 2. Extract the symbol from the supplier so the chef knows the name # We don't need .strip() here because DataEngine already did it! self.symbol = data_engine.symbol def _find_file(self): # Try the uppercase version first upper_path = os.path.join(self.data_dir, f"{self.symbol}.csv") # Try the lowercase version second lower_path = os.path.join(self.data_dir, f"{self.symbol.lower()}.csv") if os.path.exists(upper_path): return upper_path elif os.path.exists(lower_path): return lower_path # If neither exists, print a very specific message to your terminal print(f"ERROR: Searched for {upper_path} AND {lower_path} - Neither found!") return None def load_data(self): df = pd.read_csv(self.file_path) # Standardize column names to lowercase to avoid 'Price' vs 'price' issues df.columns = [c.lower() for c in df.columns] # Map common variations to a single 'price' column if 'adj close' in df.columns: df = df.rename(columns={'adj close': 'close'}) elif 'close' in df.columns: df = df.rename(columns={'close': 'close'}) return df def calculate_va_vs_dca(self, initial_inv, monthly_target, start_date, allow_sell=True, allow_fractional=True): import math # 1. Load and Prepare Data df = pd.read_csv(self.data_engine.file_path) df['date'] = pd.to_datetime(df['date']) df = df.sort_values('date') # 2. Identify the "Anchor Day" and the "Absolute Latest Day" start_dt_obj = pd.to_datetime(start_date) anchor_day = start_dt_obj.day latest_csv_date = df['date'].max() # This captures 2026-01-27 # --- 3. Filter data starting from your start_date --- df_from_start = df[df['date'] >= start_dt_obj].copy() if df_from_start.empty: return [] # --- 4. Choose rows based on Frequency --- if frequency == "Weekly": # Take every 5th trading day final_df = df_from_start.iloc[::5].copy() step_increment = float((monthly_goal * 12) / 52) elif frequency == "Bi-Weekly": # Take every 10th trading day final_df = df_from_start.iloc[::10].copy() step_increment = float((monthly_goal * 12) / 26) else: # Default to Monthly # Group by year/month and take the first available day >= anchor_day final_df = df_from_start[df_from_start['date'].dt.day >= anchor_day].groupby([ df_from_start['date'].dt.year, df_from_start['date'].dt.month ], as_index=False).first() step_increment = float(monthly_goal) # --- 5. FORCE LAST ROW SAFETY --- # Reset index to make concatenation clean final_df = final_df.reset_index(drop=True) if not df_from_start.empty: latest_row = df_from_start.iloc[[-1]] # Gets the absolute last available day if final_df.empty or latest_row.iloc[0]['date'] != final_df.iloc[-1]['date']: final_df = pd.concat([final_df, latest_row]).drop_duplicates(subset=['date']) # Sort and final index reset before the loop final_df = final_df.sort_values('date').reset_index(drop=True) if final_df.empty: return [] # --- 6. Helper for share calculation --- def get_shares(cash, prc): if prc <= 0: return 0 # Note: allow_fractional should be defined in your class/scope return cash / prc if allow_fractional else math.floor(cash / prc) # --- 7. Initial Setup --- va_shares = 0 dca_shares = 0 va_invested = 0 dca_invested = 0 va_target_value = 0 history = [] # --- 8. Strategy Loop --- for step, (idx, row) in enumerate(final_df.iterrows()): actual_date_str = row['date'].strftime('%Y-%m-%d') price = float(row['close']) if step == 0: # --- STEP 0: INITIAL DEPOSIT --- actual_inv = initial_inv dca_actual_inv = initial_inv va_target_value = initial_inv va_new_shares = get_shares(actual_inv, price) dca_new_shares = va_new_shares else: # --- STEP 1+: DVA vs DCA --- # Use step_increment (calculated in your frequency logic) # to ensure growth matches frequency (Weekly/Monthly) current_increment = step_increment # DCA Logic dca_actual_inv = current_increment dca_new_shares = get_shares(dca_actual_inv, price) # DVA Logic (Fixed Value Path) va_target_value += current_increment # Gap calculation: Target vs. current value BEFORE this step's investment current_va_val_pre = va_shares * price diff = va_target_value - current_va_val_pre # Apply Buy/Sell constraints # note: allow_sell should be defined in your class/scope actual_inv = diff if (diff >= 0 or allow_sell) else 0 va_new_shares = get_shares(actual_inv, price) # --- STATE UPDATES --- va_shares += va_new_shares dca_shares += dca_new_shares va_invested += actual_inv dca_invested += dca_actual_inv # --- Unified History Append --- history.append({ "date": actual_date_str, "price": round(price, 2), # --- DISPLAY STRINGS (For the Web UI) --- "dca_display": f"${round(dca_invested, 2):,.2f} ({dca_new_shares:+.4f})", "va_display": f"${round(actual_inv, 2):,.2f} ({va_new_shares:+.4f})", # --- RAW DATA (Your existing variables kept consistent) --- "dca_value": round(dca_shares * price, 2), "dca_invested": round(dca_invested, 2), "dca_shares_trans": round(dca_new_shares, 4), "dca_shares_total": round(dca_shares, 4), "va_value": round(va_shares * price, 2), "va_invested": round(va_invested, 2), "va_diff": round(actual_inv, 2), # This matches your ($) in the image "va_shares_trans": round(va_new_shares, 4), # This matches your (Δ Shares) "va_shares_total": round(va_shares, 4), "va_target_value": round(va_target_value, 2) }) return history def run_simulation(self, start_date, monthly_goal, initial_inv, frequency="Monthly", allow_sell=True, allow_fractional=True): # 1. DATA LOADING df = pd.read_csv(self.data_engine.file_path) df['date'] = pd.to_datetime(df['date']) df = df.sort_values('date') # 2. DATE PREP start_dt_obj = pd.to_datetime(start_date) latest_csv_date = df['date'].max() # Filter data starting from user's choice df_from_start = df[df['date'] >= start_dt_obj].copy() if df_from_start.empty: return [] # 3. FREQUENCY LOGIC (Hardened) # Standardize string for comparison freq_check = str(frequency).strip().title() if freq_check == "Weekly": final_df = df_from_start.iloc[::5].copy() step_increment = float((monthly_goal * 12) / 52) elif freq_check == "Bi-Weekly": final_df = df_from_start.iloc[::10].copy() step_increment = float((monthly_goal * 12) / 26) else: # For Monthly, we need the anchor day anchor_day = start_dt_obj.day final_df = df_from_start[df_from_start['date'].dt.day >= anchor_day].groupby([ df_from_start['date'].dt.year, df_from_start['date'].dt.month ], as_index=False).first() step_increment = float(monthly_goal) # 4. SAFETY: Ensure most recent price is included final_df = final_df.reset_index(drop=True) last_actual_row = df_from_start.iloc[[-1]] if final_df.empty or last_actual_row.iloc[0]['date'] != final_df.iloc[-1]['date']: final_df = pd.concat([final_df, last_actual_row]).drop_duplicates(subset=['date']) final_df = final_df.sort_values('date').reset_index(drop=True) # 5. STRATEGY INITIALIZATION def get_shares(cash, prc): if prc <= 0: return 0 if allow_fractional: return float(cash / prc) else: # Handles both buying (+) and selling (-) for whole shares return float(math.floor(cash / prc)) if cash >= 0 else float(math.ceil(cash / prc)) # Ensure these are initialized as floats va_shares, dca_shares = 0.0, 0.0 va_invested, dca_invested = 0.0, 0.0 history = [] for step, (idx, row) in enumerate(final_df.iterrows()): actual_date_str = row['date'].strftime('%Y-%m-%d') price = float(row['close']) if step == 0: # First row: Both strategies start with the Initial Investment va_actual_inv = float(initial_inv) dca_actual_inv = float(initial_inv) va_target_value = float(initial_inv) else: # Subsequent rows: Use the frequency-adjusted step_increment va_target_value += step_increment # DCA logic: Always invests the same amount every period dca_actual_inv = float(step_increment) # VA logic: Invests enough to hit the target value current_va_market_val = va_shares * price diff = va_target_value - current_va_market_val # Apply "Allow Sell" constraint va_actual_inv = diff if (diff >= 0 or allow_sell) else 0.0 # Update Shares based on fractional setting va_new_shares = get_shares(va_actual_inv, price) dca_new_shares = get_shares(dca_actual_inv, price) # Running totals for shares va_shares += va_new_shares dca_shares += dca_new_shares # Running totals for principal invested va_invested += va_actual_inv dca_invested += dca_actual_inv history.append({ "date": actual_date_str, "price": round(price, 2), "va_diff": round(va_actual_inv, 2), # Invested this step (VA) "va_shares_trans": round(va_new_shares, 4), "va_value": round(va_shares * price, 2), # Current Portfolio Value (VA) "va_invested": round(va_invested, 2), # Total Out-of-Pocket (VA) "va_shares_total": round(va_shares, 4), "va_target_value": round(float(va_target_value), 2), "dca_diff": round(dca_actual_inv, 2), # Invested this step (DCA) "dca_shares_trans": round(dca_new_shares, 4), "dca_value": round(dca_shares * price, 2), # Current Portfolio Value (DCA) "dca_invested": round(dca_invested, 2), # Total Out-of-Pocket (DCA) "dca_shares_total": round(dca_shares, 4) }) return history