Python金融大數據分析-1

1.使用Beautiful Soup 爬取金融數據

import requests
import re
import json

def normalize_ticker(raw: str) -> str:
    """
    如果使用者只輸入數字 (如 2330、2881),就加上 .TW
    否則保留原樣 (可以是 AAPL、TSM、2330.TW...)
    """
    t = raw.strip().upper()
    if t.isdigit():  # 純數字就視為台灣股票
        return f"{t}.TW"
    return t

def get_stock_price_api(ticker: str) -> float:
    """
    透過 Yahoo Finance Chart API 取得即時股價
    """
    url = f"https://query1.finance.yahoo.com/v8/finance/chart/{ticker}"
    params = {
        "interval": "2m",
        "range":    "1d",
        "includePrePost": "false"
    }
    headers = {"User-Agent": "Mozilla/5.0"}
    resp = requests.get(url, params=params, headers=headers)
    resp.raise_for_status()
    data = resp.json()
    return data["chart"]["result"][0]["meta"]["regularMarketPrice"]



def main():
    raw = input("請輸入股票代號 (例如 2330.TW、2881、AAPL): ").strip()
    if not raw:
        print("未輸入股票代號,程式結束。")
        return

    ticker = normalize_ticker(raw)
    print(f"\n=== 抓取 {ticker} 即時股價 ===")


    try:
        price1 = get_stock_price_api(ticker)
        print(f"  {ticker} 股價:{price1:.2f}")
    except Exception as e:
        print(f"  取得失敗:{e}")



if __name__ == "__main__":
    main()

2.將爬取之金融數據並以csv 或json格式存檔,可以的話,用排程方式記錄兩筆 

import yfinance as yf
import pandas as pd
import json
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def normalize_ticker(raw: str) -> str:
    """
    將純數字自動補上 .TW,其它代號原樣回傳
    """
    t = raw.strip().upper()
    return f"{t}.TW" if t.isdigit() else t

def fetch_history(ticker: str,
                  period: str = "1mo",
                  interval: str = "1d") -> pd.DataFrame:
    """
    用 yfinance 拿歷史資料,回傳 pandas.DataFrame
    """
    tk = yf.Ticker(ticker)
    df = tk.history(period=period, interval=interval, auto_adjust=False)
    if df.empty:
        raise ValueError(f"No data found for {ticker}")
    df = df.reset_index()
    df["Date"] = df["Date"].dt.strftime("%Y-%m-%d")
    return df

def save_csv(df: pd.DataFrame, filename: str):
    df.to_csv(filename, index=False, encoding="utf-8-sig")
    print(f"[{datetime.datetime.now()}] 已將資料存成 CSV:{filename}")

def save_json(df: pd.DataFrame, filename: str):
    records = df.to_dict(orient="records")
    with open(filename, "w", encoding="utf-8") as f:
        json.dump(records, f, ensure_ascii=False, indent=4)
    print(f"[{datetime.datetime.now()}] 已將資料存成 JSON:{filename}")

def job(ticker: str, fmt: str):
    """
    真正在排程時呼叫的函式:抓取資料、加時間戳、存檔
    """
    try:
        df = fetch_history(ticker, period="1mo", interval="1d")
        ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"{ticker.replace('.', '_')}_{ts}.{fmt}"
        if fmt == "json":
            save_json(df, filename)
        else:
            save_csv(df, filename)
    except Exception as e:
        print(f"[{datetime.datetime.now()}] 下載失敗:{e}")

def main():
    raw = input("請輸入股票代號(如 2330、2330.TW、AAPL): ").strip()
    if not raw:
        print("未輸入任何代號,程式結束。")
        return

    ticker = normalize_ticker(raw)
    fmt = input("請選擇儲存格式 (csv/json) [預設 csv]: ").strip().lower() or "csv"

    scheduler = BlockingScheduler(timezone="Asia/Taipei")

    # 立刻執行一次
    scheduler.add_job(
        job,
        trigger='date',
        run_date=datetime.datetime.now(),
        args=[ticker, fmt]
    )

    # 每天 09:00 執行一次
    scheduler.add_job(
        job,
        trigger='cron',
        args=[ticker, fmt],
        hour=9, minute=0
    )

    # 每天 15:30 再執行一次
    scheduler.add_job(
        job,
        trigger='cron',
        args=[ticker, fmt],
        hour=15, minute=30
    )

    print(f"→ 已排程:立刻下載一次;之後每日 09:00 與 15:30 自動下載 {ticker} 並存成 {fmt} 檔")
    scheduler.start()

if __name__ == "__main__":
    main()

3.舉例將csv資料寫入sqlite資料庫並以sql語法設定條件呈現所篩選的數據

import sqlite3
import csv
import os

DB_FILE  = "financial_data.db"
CSV_FILE = "2330_TW_20250418_100530.csv"

def import_csv_to_sqlite(csv_path: str, db_path: str):
    """匯入 CSV 到 SQLite(重複執行不會重複插入)。"""
    conn = sqlite3.connect(db_path)
    cur  = conn.cursor()
    cur.execute("""
    CREATE TABLE IF NOT EXISTS history (
        Date   TEXT PRIMARY KEY,
        Open   REAL,
        High   REAL,
        Low    REAL,
        Close  REAL,
        Volume INTEGER
    )""")
    conn.commit()

    with open(csv_path, newline='', encoding='utf-8-sig') as f:
        reader = csv.DictReader(f)
        rows = [
            (r['Date'],
             float(r['Open']),
             float(r['High']),
             float(r['Low']),
             float(r['Close']),
             int(r['Volume']))
            for r in reader
        ]
    # 使用 INSERT OR IGNORE 避免重複鍵錯誤
    cur.executemany(
        "INSERT OR IGNORE INTO history VALUES (?,?,?,?,?,?)",
        rows
    )
    conn.commit()
    print(f"已匯入 {len(rows)} 筆資料到 {db_path} 的 history 表。")
    return conn, cur

def query_with_params(cur):
    # 1) 查詢收盤價門檻
    thresh = input("\n請輸入要篩選的「收盤價」下限: ").strip()
    try:
        thresh = float(thresh)
    except ValueError:
        print("門檻必須是數字,跳過此查詢。")
    else:
        sql1 = "SELECT Date, Close FROM history WHERE Close > ? ORDER BY Date"
        rows1 = cur.execute(sql1, (thresh,)).fetchall()
        print(f"\n>>> 收盤價 > {thresh} 的結果共 {len(rows1)} 筆:")
        if rows1:
            for d, c in rows1:
                print(f"{d}  收盤:{c}")
        else:
            print("(無資料)")

    # 2) 查詢日期區間
    start = input("\n請輸入查詢起始日期(YYYY-MM-DD),或留空跳過: ").strip()
    end   = input("請輸入查詢結束日期(YYYY-MM-DD),或留空跳過: ").strip()
    if start and end:
        sql2 = "SELECT * FROM history WHERE Date BETWEEN ? AND ? ORDER BY Date"
        rows2 = cur.execute(sql2, (start, end)).fetchall()
        print(f"\n>>> {start} ~ {end} 區間內的資料共 {len(rows2)} 筆:")
        if rows2:
            for row in rows2:
                # row = (Date, Open, High, Low, Close, Volume)
                print(row)
        else:
            print("(無資料)")
    else:
        print("跳過日期區間查詢。")

    # 3) 查詢成交量大於平均量
    # 先計算全表平均量
    avg_vol = cur.execute("SELECT AVG(Volume) FROM history").fetchone()[0]
    sql3 = "SELECT Date, Volume FROM history WHERE Volume > ? ORDER BY Volume DESC"
    rows3 = cur.execute(sql3, (avg_vol,)).fetchall()
    print(f"\n>>> 成交量 > 全期平均 ({int(avg_vol):,}) 的資料共 {len(rows3)} 筆:")
    for d, v in rows3:
        print(f"{d} 量:{v:,}")

def main():
    if not os.path.isfile(CSV_FILE):
        print(f"找不到 CSV:{CSV_FILE},請確認檔案位置。")
        return

    conn, cur = import_csv_to_sqlite(CSV_FILE, DB_FILE)
    query_with_params(cur)
    conn.close()
    print("\nDone. 已關閉資料庫連線。")

if __name__ == "__main__":
    main()

4.範例

import twstock
import pandas as pd
# 導入twstock及pandas模組,pandas模組縮寫為pd

target_stock = '00940'  #股票代號變數
stock = twstock.Stock(target_stock)  #告訴twstock我們要查詢的股票
target_price = stock.fetch_from(2020, 5)  #取用2020/05至今每天的交易資料

name_attribute = [
    'Date', 'Capacity', 'Turnover', 'Open', 'High', 'Low', 'Close', 'Change',
    'Transcation'
]  #幫收集到的資料設定表頭

df = pd.DataFrame(columns=name_attribute, data=target_price)
#將twstock抓到的清單轉成Data Frame格式的資料表

filename = f'./data/{target_stock}_kay.csv'
#指定Data Frame轉存csv檔案的檔名與路徑

df.to_csv(filename)
#將Data Frame轉存為csv檔案

發表迴響