351 lines
14 KiB
Python
351 lines
14 KiB
Python
import pandas as pd
|
|
import requests
|
|
import os
|
|
import shutil
|
|
from datetime import datetime, time
|
|
from ta.trend import EMAIndicator
|
|
from ta.momentum import StochasticOscillator
|
|
|
|
class DataEngine:
|
|
def __init__(self, symbol=None, url=None, provider=None, data_dir='data_cache'):
|
|
self.symbol = symbol
|
|
self.url = url
|
|
self.provider = provider
|
|
|
|
# Use your robust path logic
|
|
base_path = os.path.dirname(os.path.abspath(__file__))
|
|
self.cache_dir = os.path.join(base_path, data_dir) # Use data_dir variable
|
|
os.makedirs(self.cache_dir, exist_ok=True)
|
|
|
|
# 4. Only set file_path if we actually have a symbol
|
|
if self.symbol:
|
|
self.file_path = os.path.join(self.cache_dir, f"{self.symbol}.csv")
|
|
else:
|
|
self.file_path = None
|
|
|
|
def load_instruments_from_csv(self, file_path):
|
|
import csv
|
|
instruments = []
|
|
|
|
# Updated templates for maximum historical reach
|
|
TEMPLATES = {
|
|
'jpm': "https://am.jpmorgan.com/FundsMarketingHandler/historicalData?cusip={cusip}&country=hk&role=per",
|
|
# period1=0 fetches from the earliest available date; interval=1d is daily
|
|
'yahoo': "https://query1.finance.yahoo.com/v8/finance/chart/{cusip}?period1=0&period2=9999999999&interval=1d&events=history",
|
|
# FT remains 30-day window; Smart Append logic in fetch_data handles the history
|
|
'agi': "https://markets.ft.com/data/funds/tearsheet/historical?s={cusip}"
|
|
}
|
|
|
|
try:
|
|
abs_path = os.path.join(os.path.dirname(__file__), file_path)
|
|
|
|
if not os.path.exists(abs_path):
|
|
print(f"Error: {file_path} not found.")
|
|
return []
|
|
|
|
with open(abs_path, mode='r', encoding='utf-8-sig') as csvfile:
|
|
reader = csv.DictReader(csvfile)
|
|
reader.fieldnames = [name.strip().lower() for name in reader.fieldnames]
|
|
|
|
for row in reader:
|
|
symbol = row.get('symbol', '').strip()
|
|
cusip = row.get('cusip', '').strip()
|
|
provider = row.get('provider', 'jpm').strip().lower()
|
|
|
|
if symbol and cusip:
|
|
template = TEMPLATES.get(provider, TEMPLATES['jpm'])
|
|
url = template.format(cusip=cusip)
|
|
|
|
instruments.append({
|
|
"symbol": symbol,
|
|
"url": url,
|
|
"provider": provider,
|
|
"cusip": cusip # Added this so sync_all can use it if needed
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"CSV Loading Error: {e}")
|
|
return instruments
|
|
# URL_CONFIG = load_instruments_from_csv('instruments.csv')
|
|
|
|
def global_sync(self):
|
|
"""Backup, Sync all instruments, and return a summary report."""
|
|
# 1. Run Maintenance/Backup
|
|
self.run_pre_sync_maintenance()
|
|
|
|
# FIX 1: Add 'self.' so it calls the method inside this class
|
|
instruments = self.load_instruments_from_csv('instruments.csv')
|
|
|
|
report = {
|
|
"total": len(instruments),
|
|
"updated": 0,
|
|
"failed": 0,
|
|
"details": []
|
|
}
|
|
|
|
for item in instruments:
|
|
try:
|
|
self.symbol = item['symbol']
|
|
self.provider = item['provider']
|
|
self.url = item['url']
|
|
|
|
# FIX 2: Use 'self.cache_dir' to match your __init__ logic
|
|
self.file_path = os.path.join(self.cache_dir, f"{self.symbol}.csv")
|
|
|
|
print(f"Updating {self.symbol}...")
|
|
|
|
# fetch_data now returns the updated DataFrame or None
|
|
result_df = self.fetch_data()
|
|
|
|
time.sleep(1)
|
|
|
|
if result_df is not None and not result_df.empty:
|
|
report["updated"] += 1
|
|
last_price = result_df['close'].iloc[-1]
|
|
report["details"].append(f"✅ {self.symbol}: Updated (Price: {last_price})")
|
|
else:
|
|
report["failed"] += 1
|
|
report["details"].append(f"❌ {self.symbol}: No new data found")
|
|
|
|
except Exception as e:
|
|
report["failed"] += 1
|
|
report["details"].append(f"⚠️ {self.symbol}: Error ({str(e)})")
|
|
|
|
return report
|
|
|
|
def run_pre_sync_maintenance(self):
|
|
"""Backs up files and reports current data health."""
|
|
import os
|
|
import shutil
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
|
|
# 1. Setup paths correctly
|
|
base_dir = os.path.dirname(os.path.abspath(__file__))
|
|
backup_dir = os.path.join(base_dir, 'backups')
|
|
|
|
# 2. Create the timestamped folder path FIRST
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
|
|
current_backup_path = os.path.join(backup_dir, f"sync_backup_{timestamp}")
|
|
|
|
# 3. Create the directories (safety-first)
|
|
os.makedirs(current_backup_path, exist_ok=True)
|
|
|
|
print(f"\n--- Pre-Sync Health Check ({timestamp}) ---")
|
|
stats = []
|
|
|
|
# 4. Check if cache exists to avoid errors
|
|
if not os.path.exists(self.cache_dir):
|
|
print(f"⚠️ Cache directory not found at {self.cache_dir}")
|
|
return pd.DataFrame()
|
|
|
|
# 5. Backup loop
|
|
for filename in os.listdir(self.cache_dir):
|
|
if filename.endswith(".csv"):
|
|
src = os.path.join(self.cache_dir, filename)
|
|
dst = os.path.join(current_backup_path, filename)
|
|
|
|
try:
|
|
# Perform copy
|
|
shutil.copy2(src, dst)
|
|
|
|
# Read data for health check
|
|
df = pd.read_csv(src)
|
|
|
|
# Store stats
|
|
stats.append({
|
|
"Fund": filename.replace(".csv", ""),
|
|
"Rows": len(df),
|
|
"Start": df['date'].min() if 'date' in df.columns else "N/A",
|
|
"End": df['date'].max() if 'date' in df.columns else "N/A"
|
|
})
|
|
print(f"📦 Backed up: {filename} ({len(df)} rows)")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Could not backup {filename}: {e}")
|
|
continue
|
|
|
|
# 6. Display and return report
|
|
if stats:
|
|
stats_df = pd.DataFrame(stats)
|
|
print("\n" + stats_df.to_string(index=False))
|
|
print(f"\n✅ All backups saved to: {current_backup_path}")
|
|
return stats_df
|
|
else:
|
|
print("📭 No CSV files found to backup.")
|
|
return pd.DataFrame()
|
|
|
|
def _parse_jpm(self, json_data):
|
|
if isinstance(json_data, dict) and "historicalNAVList" in json_data:
|
|
df = pd.DataFrame(json_data["historicalNAVList"])
|
|
return df.rename(columns={'navPrice': 'close', 'date': 'date'})
|
|
return None
|
|
|
|
def _parse_ft_html(self, html_text):
|
|
try:
|
|
# 1. Use BeautifulSoup to handle the nested spans in the Date column
|
|
from bs4 import BeautifulSoup
|
|
soup = BeautifulSoup(html_text, 'html.parser')
|
|
|
|
# Find the specific results table
|
|
table = soup.find('table', class_='mod-tearsheet-historical-prices__results')
|
|
if not table:
|
|
print(f"❌ Could not find the results table in the HTML for {self.symbol}")
|
|
return None
|
|
|
|
data = []
|
|
rows = table.find('tbody').find_all('tr')
|
|
|
|
for row in rows:
|
|
cols = row.find_all('td')
|
|
if len(cols) >= 5:
|
|
# The Date cell has two spans. We'll take the first one (Full date).
|
|
date_cell = cols[0].find('span', class_='mod-ui-hide-small-below')
|
|
date_str = date_cell.get_text(strip=True) if date_cell else cols[0].get_text(strip=True)
|
|
|
|
# The Close price is usually the 5th column (index 4)
|
|
close_str = cols[4].get_text(strip=True).replace(',', '')
|
|
|
|
data.append({
|
|
'date': date_str,
|
|
'close': close_str
|
|
})
|
|
|
|
# 2. Convert to DataFrame
|
|
df = pd.DataFrame(data)
|
|
if df.empty:
|
|
return None
|
|
|
|
# 3. Final Type Conversion
|
|
df['date'] = pd.to_datetime(df['date'], errors='coerce')
|
|
df['close'] = pd.to_numeric(df['close'], errors='coerce')
|
|
|
|
return df.dropna().sort_values('date').reset_index(drop=True)
|
|
|
|
except Exception as e:
|
|
print(f"❌ Failed to parse FT HTML structure: {e}")
|
|
return None
|
|
|
|
def _parse_yahoo(self, json_data):
|
|
"""Parses Yahoo Finance v8 Chart JSON"""
|
|
try:
|
|
chart = json_data['chart']['result'][0]
|
|
timestamps = chart['timestamp']
|
|
indicators = chart['indicators']['quote'][0]
|
|
# Use adjclose if available, otherwise close
|
|
closes = indicators.get('close', [])
|
|
df = pd.DataFrame({
|
|
'date': pd.to_datetime(timestamps, unit='s'),
|
|
'close': closes
|
|
})
|
|
return df
|
|
except:
|
|
return None
|
|
|
|
def fetch_data(self):
|
|
local_df = pd.DataFrame()
|
|
new_df = None
|
|
|
|
# 1. Load Local Cache & Force Date Type
|
|
if os.path.exists(self.file_path):
|
|
try:
|
|
local_df = pd.read_csv(self.file_path)
|
|
local_df = local_df.loc[:, ~local_df.columns.duplicated()].copy()
|
|
local_df.columns = [c.lower().strip() for c in local_df.columns]
|
|
local_df = local_df.rename(columns={'price': 'close', 'nav': 'close'})
|
|
|
|
# FORCE CONVERSION: This fixes the '<' error
|
|
# errors='coerce' turns bad text into NaT (Not a Time), which we then drop
|
|
local_df['date'] = pd.to_datetime(local_df['date'], errors='coerce')
|
|
local_df = local_df.dropna(subset=['date']).reset_index(drop=True)
|
|
except Exception as e:
|
|
print(f"Local Load Error: {e}")
|
|
|
|
# 2. Network Fetch
|
|
try:
|
|
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
|
|
response = requests.get(self.url, headers=headers, timeout=15)
|
|
response.raise_for_status()
|
|
|
|
if self.provider == 'agi':
|
|
new_df = self._parse_ft_html(response.text)
|
|
elif self.provider == 'jpm':
|
|
new_df = self._parse_jpm(response.json())
|
|
elif self.provider == 'yahoo':
|
|
new_df = self._parse_yahoo(response.json())
|
|
|
|
# 3. Safe Merge & Sort
|
|
if new_df is not None and not new_df.empty:
|
|
# Force new_df dates to match local_df format
|
|
new_df['date'] = pd.to_datetime(new_df['date'], errors='coerce')
|
|
|
|
combined_df = pd.concat([local_df, new_df], ignore_index=True)
|
|
combined_df = combined_df.drop_duplicates(subset=['date'], keep='last')
|
|
|
|
# SORTING: Now safe because all types are Timestamps
|
|
combined_df = combined_df.sort_values('date').reset_index(drop=True)
|
|
|
|
if 'close' in combined_df.columns:
|
|
final_df = combined_df[['date', 'close']].dropna()
|
|
final_df.to_csv(self.file_path, index=False)
|
|
return final_df
|
|
|
|
return local_df
|
|
|
|
except Exception as e:
|
|
print(f"Network error for {self.symbol}: {e}")
|
|
return local_df
|
|
|
|
def get_local_metrics(self):
|
|
"""Reads ONLY from local CSV and returns metrics immediately."""
|
|
if not os.path.exists(self.file_path):
|
|
return {"error": "Missing Local Data", "status": "needs_sync"}
|
|
|
|
try:
|
|
df = pd.read_csv(self.file_path)
|
|
# Ensure columns are clean
|
|
df.columns = [c.lower().strip() for c in df.columns]
|
|
df['date'] = pd.to_datetime(df['date'], errors='coerce')
|
|
df = df.dropna(subset=['date', 'close']).sort_values('date')
|
|
|
|
# Pass this local dataframe to your existing calculation function
|
|
return self.calculate_table_metrics(df)
|
|
except Exception as e:
|
|
print(f"Error reading local data for {self.symbol}: {e}")
|
|
return None
|
|
|
|
|
|
def calculate_table_metrics(self, df):
|
|
if df is None or df.empty or len(df) < 2:
|
|
return None
|
|
|
|
last_close = float(df.iloc[-1]['close'])
|
|
prev_close = float(df.iloc[-2]['close'])
|
|
change_pct = ((last_close - prev_close) / prev_close) * 100
|
|
count = len(df)
|
|
|
|
def get_ema_offset(window):
|
|
if count >= window:
|
|
ema = EMAIndicator(close=df['close'], window=window).ema_indicator().iloc[-1]
|
|
return round(((last_close / ema) * 100) - 100, 1)
|
|
return "N/A"
|
|
|
|
k_val = d_val = "N/A"
|
|
if count >= 14:
|
|
high_14 = df['close'].rolling(window=14).max()
|
|
low_14 = df['close'].rolling(window=14).min()
|
|
stoch = StochasticOscillator(high=high_14, low=low_14, close=df['close'], window=14)
|
|
k_val = round(stoch.stoch().iloc[-1], 0)
|
|
d_val = round(stoch.stoch_signal().iloc[-1], 0)
|
|
|
|
return {
|
|
"last_close": round(last_close, 2),
|
|
"change_pct": round(change_pct, 2),
|
|
"low_52": round(float(df.tail(252)['close'].min()), 2),
|
|
"high_52": round(float(df.tail(252)['close'].max()), 2),
|
|
"last_ema20": get_ema_offset(20),
|
|
"last_ema50": get_ema_offset(50),
|
|
"last_ema100": get_ema_offset(100),
|
|
"last_ema200": get_ema_offset(200),
|
|
"kd_values": f"{k_val}/{d_val}" if k_val != "N/A" else "N/A"
|
|
} |