value averging model bulild, can use yahoo and csv fron other sources

This commit is contained in:
2026-01-28 08:48:53 +08:00
parent 9e7f474d5e
commit cf708d2466
12 changed files with 38342 additions and 27 deletions
+244 -21
View File
@@ -1,56 +1,107 @@
import pandas as pd
import requests
import os
import csv
import shutil
from datetime import datetime, time
import yfinance as yf
from ta.trend import EMAIndicator
from ta.momentum import StochasticOscillator
import math
class DataEngine:
def __init__(self, symbol=None, url=None, provider=None, data_dir='data_cache'):
self.symbol = symbol
self.url = url
self.provider = provider
# 1. Clean the incoming symbol
self.symbol = symbol.strip().upper() if symbol else None
# Use your robust path logic
# 2. Setup centralized paths
base_path = os.path.dirname(os.path.abspath(__file__))
self.cache_dir = os.path.join(base_path, data_dir) # Use data_dir variable
self.cache_dir = os.path.join(base_path, data_dir)
os.makedirs(self.cache_dir, exist_ok=True)
# 3. Load the master instrument list to find URLs/Providers
# This ensures the engine knows where to go for special tickers
self.master_instruments = self.load_instruments_from_csv('instruments.csv')
# 4. Only set file_path if we actually have a symbol
# 4. Find config from master list or use passed-in arguments
instrument_config = next((i for i in self.master_instruments if i['symbol'] == self.symbol), None)
if instrument_config:
self.url = instrument_config['url']
self.provider = instrument_config['provider']
else:
# Fallback to arguments if ticker isn't in the CSV list
self.url = url
self.provider = provider or 'yahoo'
# 5. Define final file path for centralized storage
if self.symbol:
self.file_path = os.path.join(self.cache_dir, f"{self.symbol}.csv")
else:
self.file_path = None
self.ensure_data()
def ensure_data(self):
"""Checks if file exists; if not, downloads it."""
if os.path.exists(self.file_path):
return True # Data is already there
print(f"DEBUG: {self.symbol} not found in cache. Attempting download...")
try:
# For a generic ticker like SPY, we use yfinance
import yfinance as yf
df = yf.download(self.symbol, period="max")
if df.empty:
print(f"ERROR: No data found for {self.symbol}")
return False
# Clean and save
# 1. If columns are MultiIndex (tuples), take just the first level (the price name)
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
# 2. Reset index to turn 'Date' into a column
df.reset_index(inplace=True)
# 3. Now it is safe to lowercase the column names
df.columns = [str(c).lower() for c in df.columns]
df.to_csv(self.file_path, index=False)
print(f"DEBUG: Successfully cached {self.symbol}")
return True
except Exception as e:
print(f"ERROR: Download failed for {self.symbol}: {e}")
return False
def load_instruments_from_csv(self, file_path):
import csv
instruments = []
# Updated templates for maximum historical reach
TEMPLATES = {
'jpm': "https://am.jpmorgan.com/FundsMarketingHandler/historicalData?cusip={cusip}&country=hk&role=per",
# period1=0 fetches from the earliest available date; interval=1d is daily
'yahoo': "https://query1.finance.yahoo.com/v8/finance/chart/{cusip}?period1=0&period2=9999999999&interval=1d&events=history",
# FT remains 30-day window; Smart Append logic in fetch_data handles the history
'agi': "https://markets.ft.com/data/funds/tearsheet/historical?s={cusip}"
}
try:
abs_path = os.path.join(os.path.dirname(__file__), file_path)
# Get absolute path relative to this script
abs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_path)
if not os.path.exists(abs_path):
print(f"Error: {file_path} not found.")
print(f"Error: Master list {file_path} not found at {abs_path}")
return []
with open(abs_path, mode='r', encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)
# Clean header names (lowercase + remove whitespace)
reader.fieldnames = [name.strip().lower() for name in reader.fieldnames]
for row in reader:
symbol = row.get('symbol', '').strip()
cusip = row.get('cusip', '').strip()
provider = row.get('provider', 'jpm').strip().lower()
# Use .get() with fallback to avoid KeyErrors
symbol = (row.get('symbol') or '').strip().upper()
cusip = (row.get('cusip') or '').strip()
provider = (row.get('provider') or 'jpm').strip().lower()
if symbol and cusip:
template = TEMPLATES.get(provider, TEMPLATES['jpm'])
@@ -60,14 +111,37 @@ class DataEngine:
"symbol": symbol,
"url": url,
"provider": provider,
"cusip": cusip # Added this so sync_all can use it if needed
"cusip": cusip
})
except Exception as e:
print(f"CSV Loading Error: {e}")
print(f"CRITICAL: Failed to load instruments.csv: {e}")
return instruments
# URL_CONFIG = load_instruments_from_csv('instruments.csv')
def _ensure_data_exists(self):
if not os.path.exists(self.file_path):
# Check if this symbol exists in our master CSV mapping
match = next((i for i in self.instruments if i['symbol'].upper() == self.symbol), None)
if match:
print(f"DEBUG: Found {self.symbol} in master list. Fetching from {match['provider']}...")
self._download_from_provider(match)
else:
print(f"DEBUG: {self.symbol} not in master list. Trying generic Yahoo Finance...")
self._download_generic_yahoo()
def _download_generic_yahoo(self):
"""Standard yfinance fallback"""
try:
df = yf.download(self.symbol, period="max")
if not df.empty:
df.reset_index(inplace=True)
df.columns = [c.lower() for c in df.columns]
df.to_csv(self.file_path, index=False)
except Exception as e:
print(f"Yahoo fallback failed: {e}")
def global_sync(self):
"""Backup, Sync all instruments, and return a summary report."""
# 1. Run Maintenance/Backup
@@ -348,4 +422,153 @@ class DataEngine:
"last_ema100": get_ema_offset(100),
"last_ema200": get_ema_offset(200),
"kd_values": f"{k_val}/{d_val}" if k_val != "N/A" else "N/A"
}
}
class StrategyEngine:
"""
Handles financial strategy simulations and backtesting.
This class takes a DataEngine instance to access files.
"""
def __init__(self, data_engine):
# 1. Save the engine object (The 'Supplier')
self.data_engine = data_engine
# 2. Extract the symbol from the supplier so the chef knows the name
# We don't need .strip() here because DataEngine already did it!
self.symbol = data_engine.symbol
def _find_file(self):
# Try the uppercase version first
upper_path = os.path.join(self.data_dir, f"{self.symbol}.csv")
# Try the lowercase version second
lower_path = os.path.join(self.data_dir, f"{self.symbol.lower()}.csv")
if os.path.exists(upper_path):
return upper_path
elif os.path.exists(lower_path):
return lower_path
# If neither exists, print a very specific message to your terminal
print(f"ERROR: Searched for {upper_path} AND {lower_path} - Neither found!")
return None
def load_data(self):
df = pd.read_csv(self.file_path)
# Standardize column names to lowercase to avoid 'Price' vs 'price' issues
df.columns = [c.lower() for c in df.columns]
# Map common variations to a single 'price' column
if 'adj close' in df.columns:
df = df.rename(columns={'adj close': 'close'})
elif 'close' in df.columns:
df = df.rename(columns={'close': 'close'})
return df
def calculate_va_vs_dca(self, initial_inv, monthly_target, start_date, allow_sell=True, allow_fractional=True):
import math
# 1. Load and Prepare Data
df = pd.read_csv(self.data_engine.file_path)
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values('date')
# 2. Identify the "Anchor Day" and the "Absolute Latest Day"
start_dt_obj = pd.to_datetime(start_date)
anchor_day = start_dt_obj.day
latest_csv_date = df['date'].max() # This captures 2026-01-27
# 3. Filter data starting from your start_date
df_filtered = df[df['date'] >= start_dt_obj].copy()
# 4. Select recurring monthly days (The first trading day on/after the anchor day)
monthly_df = df_filtered[df_filtered['date'].dt.day >= anchor_day].groupby([
df_filtered['date'].dt.year,
df_filtered['date'].dt.month
], as_index=False).first()
# 5. FORCE LAST ROW: If the latest date from CSV isn't in our list, append it
if monthly_df.empty or monthly_df.iloc[-1]['date'] != latest_csv_date:
last_row = df_filtered[df_filtered['date'] == latest_csv_date]
monthly_df = pd.concat([monthly_df, last_row]).drop_duplicates(subset=['date'])
# 6. Finalize index for the strategy loop
monthly_df.index = pd.to_datetime(monthly_df['date'])
if monthly_df.empty:
return []
# Helper for share calculation based on user toggle
def get_shares(cash, prc):
if prc <= 0: return 0
return cash / prc if allow_fractional else math.floor(cash / prc)
# 2. Initial Setup
va_shares = 0
dca_shares = 0
va_invested = 0
dca_invested = 0
va_target_value = 0
history = []
# 3. Strategy Loop
for i, row in monthly_df.iterrows():
actual_date_str = i.strftime('%Y-%m-%d')
price = float(row['close'])
if i == monthly_df.index[0]:
# --- MONTH 0: INITIAL DEPOSIT ---
actual_inv = initial_inv # This is the 'va_diff'
dca_actual_inv = initial_inv
va_target_value = initial_inv
diff = 0
va_new_shares = get_shares(actual_inv, price)
dca_new_shares = va_new_shares
else:
# --- MONTH 1+: DVA vs DCA ---
# DCA Logic
dca_actual_inv = monthly_target
dca_new_shares = get_shares(dca_actual_inv, price)
# DVA Logic (Fixed Value Path)
va_target_value += monthly_target
# Gap calculation: Target vs. current value BEFORE this month's investment
current_va_val_pre = va_shares * price
diff = va_target_value - current_va_val_pre
# Apply Buy/Sell constraints
actual_inv = diff if (diff >= 0 or allow_sell) else 0
va_new_shares = get_shares(actual_inv, price)
# --- STATE UPDATES (Must happen for both Month 0 and Month 1+) ---
va_shares += va_new_shares
dca_shares += dca_new_shares
va_invested += actual_inv
dca_invested += dca_actual_inv
# --- Unified History Append ---
# We calculate these here so they are ALWAYS defined for every row
history.append({
"date": actual_date_str,
"price": round(price, 2),
"dca_value": round(dca_shares * price, 2),
"dca_invested": round(dca_invested, 2),
"dca_shares_trans": round(dca_new_shares, 4),
"dca_shares_total": round(dca_shares, 4),
"va_value": round(va_shares * price, 2), # Becomes 'Current Portfolio Value'
"va_invested": round(va_invested, 2), # Becomes 'Total Invested'
"va_diff": round(actual_inv, 2),
"va_shares_trans": round(va_new_shares, 4),
"va_shares_total": round(va_shares, 4),
"va_target_value": round(va_target_value, 2) # Used for next goal
})
# Debugging print
print(f"Date: {i.strftime('%Y-%m')}, Target: {va_target_value:.2f}, Portfolio: {va_invested:.2f}, Diff: {diff:.2f}")
return history