base version only KD indicator with jpm, ft yahoo as sources

This commit is contained in:
2026-01-27 04:15:19 +08:00
commit 61b32bbdd7
12 changed files with 15992 additions and 0 deletions
+202
View File
@@ -0,0 +1,202 @@
import pandas as pd
import requests
import os
from datetime import datetime
from ta.trend import EMAIndicator
from ta.momentum import StochasticOscillator
class DataEngine:
def __init__(self, symbol, url, provider):
self.symbol = symbol
self.url = url
self.provider = provider
# 1. Get the folder where engine.py lives
base_path = os.path.dirname(os.path.abspath(__file__))
# 2. Define the cache directory path
self.cache_dir = os.path.join(base_path, "data_cache")
# 3. Create the folder if it doesn't exist (safety-first)
os.makedirs(self.cache_dir, exist_ok=True)
# 4. Set the full path for this specific instrument's CSV
self.file_path = os.path.join(self.cache_dir, f"{self.symbol}.csv")
def global_sync(self):
"""The 'One-Click' background loop."""
# 1. Get the latest list of instruments from your CSV
all_instruments = self.load_instruments_from_csv()
for item in all_instruments:
# 2. Update the 'Current' target for the engine
self.symbol = item['symbol']
self.cusip = item['cusip']
self.provider = item['provider']
# 3. Regenerate the URL and File Path for THIS specific instrument
self.url = self.generate_url()
self.file_path = os.path.join(self.data_dir, f"{self.symbol}.csv")
# 4. Run the robust fetch/merge logic we built
print(f"Syncing {self.symbol}...")
self.fetch_data()
print("Global Sync Complete.")
def _parse_jpm(self, json_data):
if isinstance(json_data, dict) and "historicalNAVList" in json_data:
df = pd.DataFrame(json_data["historicalNAVList"])
return df.rename(columns={'navPrice': 'close', 'date': 'date'})
return None
def _parse_ft_html(self, html_text):
try:
# 1. Use BeautifulSoup to handle the nested spans in the Date column
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_text, 'html.parser')
# Find the specific results table
table = soup.find('table', class_='mod-tearsheet-historical-prices__results')
if not table:
print(f"❌ Could not find the results table in the HTML for {self.symbol}")
return None
data = []
rows = table.find('tbody').find_all('tr')
for row in rows:
cols = row.find_all('td')
if len(cols) >= 5:
# The Date cell has two spans. We'll take the first one (Full date).
date_cell = cols[0].find('span', class_='mod-ui-hide-small-below')
date_str = date_cell.get_text(strip=True) if date_cell else cols[0].get_text(strip=True)
# The Close price is usually the 5th column (index 4)
close_str = cols[4].get_text(strip=True).replace(',', '')
data.append({
'date': date_str,
'close': close_str
})
# 2. Convert to DataFrame
df = pd.DataFrame(data)
if df.empty:
return None
# 3. Final Type Conversion
df['date'] = pd.to_datetime(df['date'], errors='coerce')
df['close'] = pd.to_numeric(df['close'], errors='coerce')
return df.dropna().sort_values('date').reset_index(drop=True)
except Exception as e:
print(f"❌ Failed to parse FT HTML structure: {e}")
return None
def _parse_yahoo(self, json_data):
"""Parses Yahoo Finance v8 Chart JSON"""
try:
chart = json_data['chart']['result'][0]
timestamps = chart['timestamp']
indicators = chart['indicators']['quote'][0]
# Use adjclose if available, otherwise close
closes = indicators.get('close', [])
df = pd.DataFrame({
'date': pd.to_datetime(timestamps, unit='s'),
'close': closes
})
return df
except:
return None
def fetch_data(self):
local_df = pd.DataFrame()
new_df = None
# 1. Load Local Cache & Force Date Type
if os.path.exists(self.file_path):
try:
local_df = pd.read_csv(self.file_path)
local_df = local_df.loc[:, ~local_df.columns.duplicated()].copy()
local_df.columns = [c.lower().strip() for c in local_df.columns]
local_df = local_df.rename(columns={'price': 'close', 'nav': 'close'})
# FORCE CONVERSION: This fixes the '<' error
# errors='coerce' turns bad text into NaT (Not a Time), which we then drop
local_df['date'] = pd.to_datetime(local_df['date'], errors='coerce')
local_df = local_df.dropna(subset=['date']).reset_index(drop=True)
except Exception as e:
print(f"Local Load Error: {e}")
# 2. Network Fetch
try:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
response = requests.get(self.url, headers=headers, timeout=15)
response.raise_for_status()
if self.provider == 'agi':
new_df = self._parse_ft_html(response.text)
elif self.provider == 'jpm':
new_df = self._parse_jpm(response.json())
elif self.provider == 'yahoo':
new_df = self._parse_yahoo(response.json())
# 3. Safe Merge & Sort
if new_df is not None and not new_df.empty:
# Force new_df dates to match local_df format
new_df['date'] = pd.to_datetime(new_df['date'], errors='coerce')
combined_df = pd.concat([local_df, new_df], ignore_index=True)
combined_df = combined_df.drop_duplicates(subset=['date'], keep='last')
# SORTING: Now safe because all types are Timestamps
combined_df = combined_df.sort_values('date').reset_index(drop=True)
if 'close' in combined_df.columns:
final_df = combined_df[['date', 'close']].dropna()
final_df.to_csv(self.file_path, index=False)
return final_df
return local_df
except Exception as e:
print(f"Network error for {self.symbol}: {e}")
return local_df
def calculate_table_metrics(self, df):
if df is None or df.empty or len(df) < 2:
return None
last_close = float(df.iloc[-1]['close'])
prev_close = float(df.iloc[-2]['close'])
change_pct = ((last_close - prev_close) / prev_close) * 100
count = len(df)
def get_ema_offset(window):
if count >= window:
ema = EMAIndicator(close=df['close'], window=window).ema_indicator().iloc[-1]
return round(((last_close / ema) * 100) - 100, 1)
return "N/A"
k_val = d_val = "N/A"
if count >= 14:
high_14 = df['close'].rolling(window=14).max()
low_14 = df['close'].rolling(window=14).min()
stoch = StochasticOscillator(high=high_14, low=low_14, close=df['close'], window=14)
k_val = round(stoch.stoch().iloc[-1], 0)
d_val = round(stoch.stoch_signal().iloc[-1], 0)
return {
"last_close": round(last_close, 2),
"change_pct": round(change_pct, 2),
"low_52": round(float(df.tail(252)['close'].min()), 2),
"high_52": round(float(df.tail(252)['close'].max()), 2),
"last_ema20": get_ema_offset(20),
"last_ema50": get_ema_offset(50),
"last_ema100": get_ema_offset(100),
"last_ema200": get_ema_offset(200),
"kd_values": f"{k_val}/{d_val}" if k_val != "N/A" else "N/A"
}