跳转至

write_data

FactorReader

Source code in pure_ocean_breeze/data/write_data.py
Python
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
class FactorReader:
    def __init__(
        self,
        user: str = "admin",
        password: str = "quest",
        host: str = "127.0.0.1",
        port: str = "8812",
        database: str = "qdb",
    ) -> None:
        """通过postgre的psycopg2驱动连接questdb数据库

        Parameters
        ----------
        user : str, optional
            用户名, by default "admin"
        password : str, optional
            密码, by default "quest"
        host : str, optional
            地址, by default "43.143.223.158"
        port : str, optional
            端口, by default "8812"
        database : str, optional
            数据库, by default "qdb"
        """
        self.user = user
        self.password = password
        self.host = host
        self.port = port
        self.database = database

    def __connect(self):
        conn = pg.connect(
            user=self.user,
            password=self.password,
            host=self.host,
            port=self.port,
            database=self.database,
        )
        return conn

    def update_factor(self, table_name: str, df: pd.DataFrame):
        tables = self.__get_data("show tables").table.tolist()
        if table_name in tables:
            logger.info(f"{table_name}已经存在了,即将更新")
            old_end = self.__get_data(f"select max(date) from {table_name}").iloc[0, 0]
            new = df[df.index > old_end]
            new = new.stack().reset_index()
            new.columns = ["date", "code", "fac"]
        else:
            logger.info(f"{table_name}第一次上传")
            new = df.stack().reset_index()
            new.columns = ["date", "code", "fac"]
        self.__write_via_df(new, table_name)

    def __write_via_df(
        self,
        df: pd.DataFrame,
        table_name: str,
        symbols=None,
        tuple_col=None,
    ) -> None:
        """通过questdb的python库直接将dataframe写入quested数据库

        Parameters
        ----------
        df : pd.DataFrame
            要写入的dataframe
        table_name : str
            questdb中该表的表名
        symbols : Union[str, bool, List[int], List[str]], optional
            为symbols的那些列的名称, by default None
        tuple_col : Union[str, List[str]], optional
            数据类型为tuple或list的列的名字, by default None
        """
        if tuple_col is None:
            ...
        elif isinstance(tuple_col, str):
            df[tuple_col] = df[tuple_col].apply(str)
        else:
            for t in tuple_col:
                df[t] = df[t].apply(str)
        if symbols is not None:
            with qdbing.Sender(self.host, 9009) as sender:
                sender.dataframe(df, table_name=table_name, symbols=symbols)
        else:
            with qdbing.Sender(self.host, 9009) as sender:
                sender.dataframe(df, table_name=table_name)

    @retry(stop=stop_after_attempt(10))
    def __get_data(self, sql_order: str) -> pd.DataFrame:
        """以sql命令的方式,从数据库中读取数据

        Parameters
        ----------
        sql_order : str
            sql命令

        Returns
        -------
        pd.DataFrame
            读取的结果
        """
        conn = self.__connect()
        cursor = conn.cursor()
        cursor.execute(sql_order)
        df_data = cursor.fetchall()
        columns = [i[0] for i in cursor.description]
        df = pd.DataFrame(df_data, columns=columns)
        return df

    def add_token(self, tokens: List[str], users: List[str]):
        tus = pd.DataFrame({"token": tokens, "user": users})
        self.__write_via_df(tus, "tokenlines")

__get_data(sql_order)

以sql命令的方式,从数据库中读取数据

Parameters

sql_order : str sql命令

Returns

pd.DataFrame 读取的结果

Source code in pure_ocean_breeze/data/write_data.py
Python
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
@retry(stop=stop_after_attempt(10))
def __get_data(self, sql_order: str) -> pd.DataFrame:
    """以sql命令的方式,从数据库中读取数据

    Parameters
    ----------
    sql_order : str
        sql命令

    Returns
    -------
    pd.DataFrame
        读取的结果
    """
    conn = self.__connect()
    cursor = conn.cursor()
    cursor.execute(sql_order)
    df_data = cursor.fetchall()
    columns = [i[0] for i in cursor.description]
    df = pd.DataFrame(df_data, columns=columns)
    return df

__init__(user='admin', password='quest', host='127.0.0.1', port='8812', database='qdb')

通过postgre的psycopg2驱动连接questdb数据库

Parameters

user : str, optional 用户名, by default "admin" password : str, optional 密码, by default "quest" host : str, optional 地址, by default "43.143.223.158" port : str, optional 端口, by default "8812" database : str, optional 数据库, by default "qdb"

Source code in pure_ocean_breeze/data/write_data.py
Python
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
def __init__(
    self,
    user: str = "admin",
    password: str = "quest",
    host: str = "127.0.0.1",
    port: str = "8812",
    database: str = "qdb",
) -> None:
    """通过postgre的psycopg2驱动连接questdb数据库

    Parameters
    ----------
    user : str, optional
        用户名, by default "admin"
    password : str, optional
        密码, by default "quest"
    host : str, optional
        地址, by default "43.143.223.158"
    port : str, optional
        端口, by default "8812"
    database : str, optional
        数据库, by default "qdb"
    """
    self.user = user
    self.password = password
    self.host = host
    self.port = port
    self.database = database

__write_via_df(df, table_name, symbols=None, tuple_col=None)

通过questdb的python库直接将dataframe写入quested数据库

Parameters

df : pd.DataFrame 要写入的dataframe table_name : str questdb中该表的表名 symbols : Union[str, bool, List[int], List[str]], optional 为symbols的那些列的名称, by default None tuple_col : Union[str, List[str]], optional 数据类型为tuple或list的列的名字, by default None

Source code in pure_ocean_breeze/data/write_data.py
Python
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
def __write_via_df(
    self,
    df: pd.DataFrame,
    table_name: str,
    symbols=None,
    tuple_col=None,
) -> None:
    """通过questdb的python库直接将dataframe写入quested数据库

    Parameters
    ----------
    df : pd.DataFrame
        要写入的dataframe
    table_name : str
        questdb中该表的表名
    symbols : Union[str, bool, List[int], List[str]], optional
        为symbols的那些列的名称, by default None
    tuple_col : Union[str, List[str]], optional
        数据类型为tuple或list的列的名字, by default None
    """
    if tuple_col is None:
        ...
    elif isinstance(tuple_col, str):
        df[tuple_col] = df[tuple_col].apply(str)
    else:
        for t in tuple_col:
            df[t] = df[t].apply(str)
    if symbols is not None:
        with qdbing.Sender(self.host, 9009) as sender:
            sender.dataframe(df, table_name=table_name, symbols=symbols)
    else:
        with qdbing.Sender(self.host, 9009) as sender:
            sender.dataframe(df, table_name=table_name)

database_save_final_factors(df, name, order, freq='月')

保存最终因子的因子值

Parameters

df : pd.DataFrame 最终因子值 name : str 因子的名字,如“适度冒险” order : int 因子的序号

Source code in pure_ocean_breeze/data/write_data.py
Python
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
def database_save_final_factors(
    df: pd.DataFrame, name: str, order: int, freq: str = "月"
) -> None:
    """保存最终因子的因子值

    Parameters
    ----------
    df : pd.DataFrame
        最终因子值
    name : str
        因子的名字,如“适度冒险”
    order : int
        因子的序号
    """
    homeplace = HomePlace()
    path = (
        homeplace.final_factor_file
        + name
        + "_"
        + "多因子"
        + str(order)
        + "_"
        + freq
        + ".parquet"
    )
    df = df.drop_duplicates().dropna(how="all")
    df.to_parquet(path)
    final_date = df.index.max()
    final_date = datetime.datetime.strftime(final_date, "%Y%m%d")
    logger.success(f"今日计算的因子值保存,最新一天为{final_date}")

database_update_daily_files()

更新数据库中的日频数据

Raises

ValueError 如果上次更新到本次更新没有新的交易日,将报错

Source code in pure_ocean_breeze/data/write_data.py
Python
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
def database_update_daily_files() -> None:
    """更新数据库中的日频数据

    Raises
    ------
    `ValueError`
        如果上次更新到本次更新没有新的交易日,将报错
    """
    homeplace = HomePlace()

    def single_file(name):
        df = pd.read_parquet(homeplace.daily_data_file + name + ".parquet")
        startdate = df.index.max() + pd.Timedelta(days=1)
        return startdate

    names = [
        "opens",
        "highs",
        "lows",
        "closes",
        "trs",
        "opens_unadj",
        "highs_unadj",
        "lows_unadj",
        "closes_unadj",
        "sharenums",
        "total_sharenums",
        "ages",
        "sts",
        "states",
        "amounts",
        "pb",
        "pe",
        'pettm',
        "vwaps",
        "adjfactors",
        "stop_ups",
        "stop_downs",
    ]
    startdates = list(map(single_file, names))
    startdate = min(startdates)
    startdate = datetime.datetime.strftime(startdate, "%Y%m%d")
    now = datetime.datetime.now()
    if now.hour < 17:
        now = now - pd.Timedelta(days=1)
    now = datetime.datetime.strftime(now, "%Y%m%d")
    logger.info(f"日频数据上次更新到{startdate},本次将更新到{now}")

    # 交易日历
    df0 = download_calendar(startdate, now)
    tradedates = sorted(list(set(df0.trade_date)))
    finish = 1
    if len(tradedates) > 1:
        # 存储每天数据
        df1s = []
        df2s = []
        for day in tqdm.auto.tqdm(tradedates, desc="正在下载日频数据"):
            df1, df2 = download_single_daily(day)
            df1s.append(df1)
            df2s.append(df2)
        # 8个价格,交易状态,成交量,
        df1s = pd.concat(df1s)
        # 换手率,流通股本,换手率要除以100,流通股本要乘以10000
        df2s = pd.concat(df2s)
    elif len(tradedates) == 1:
        df1s, df2s = download_single_daily(tradedates[0])
    else:
        finish = 0
        logger.info("从上次更新到这次更新,还没有经过交易日。放假就好好休息吧,别跑代码了🤒")
    if finish:
        df1s.tradestatus = (df1s.tradestatus == "交易") + 0
        df2s = df2s.rename(columns={"ts_code": "code"})
        df1s.trade_date = pd.to_datetime(df1s.trade_date, format="%Y%m%d")
        df2s.trade_date = pd.to_datetime(df2s.trade_date, format="%Y%m%d")
        df1s = df1s.rename(columns={"trade_date": "date"})
        df2s = df2s.rename(columns={"trade_date": "date"})
        both_codes = list(set(df1s.code) & set(df2s.code))
        df1s = df1s[df1s.code.isin(both_codes)]
        df2s = df2s[df2s.code.isin(both_codes)]
        # st股
        df3 = pro.ashare_st()

        def to_mat(df, row, name, ind="date", col="code"):
            df = df[[ind, col, row]].pivot(index=ind, columns=col, values=row)
            old = pd.read_parquet(homeplace.daily_data_file + name + ".parquet")
            new = pd.concat([old, df]).drop_duplicates()
            new = drop_duplicates_index(new)
            new = new[sorted(list(new.columns))]
            new.to_parquet(homeplace.daily_data_file + name + ".parquet")
            logger.success(name + "已更新")
            return new

        # 股票日行情(未复权高开低收,复权高开低收,交易状态,成交量)
        part1 = df1s.copy()
        part1.volume = part1.volume * 100
        # 未复权开盘价
        opens = to_mat(part1, "open", "opens_unadj")
        # 未复权最高价
        highs = to_mat(part1, "high", "highs_unadj")
        # 未复权最低价
        lows = to_mat(part1, "low", "lows_unadj")
        # 未复权收盘价
        closes = to_mat(part1, "close", "closes_unadj")
        # 成交量
        volumes = to_mat(part1, "volume", "amounts")
        # 复权开盘价
        diopens = to_mat(part1, "adjopen", "opens")
        # 复权最高价
        dihighs = to_mat(part1, "adjhigh", "highs")
        # 复权最低价
        dilows = to_mat(part1, "adjlow", "lows")
        # 复权收盘价
        dicloses = to_mat(part1, "adjclose", "closes")
        # 交易状态
        status = to_mat(part1, "tradestatus", "states")
        # 平均价格
        vwaps = to_mat(part1, "avgprice", "vwaps")
        # 复权因子
        adjfactors = to_mat(part1, "adjfactor", "adjfactors")
        # 涨停价
        stop_ups = to_mat(part1, "limit", "stop_ups")
        # 跌停价
        stop_downs = to_mat(part1, "stopping", "stop_downs")

        # 换手率
        part2 = df2s[["date", "code", "turnover_rate"]].pivot(
            index="date", columns="code", values="turnover_rate"
        )
        part2 = part2 / 100
        part2_old = pd.read_parquet(homeplace.daily_data_file + "trs.parquet")
        part2_new = pd.concat([part2_old, part2])
        part2_new = part2_new.drop_duplicates()
        part2_new = part2_new[closes.columns]
        part2_new = part2_new[sorted(list(part2_new.columns))]
        part2_new = drop_duplicates_index(part2_new)
        part2_new.to_parquet(homeplace.daily_data_file + "trs.parquet")
        logger.success("换手率更新完成")

        # 流通股数
        # 读取新的流通股变动数
        part3 = df2s[["date", "code", "float_share"]].pivot(
            columns="code", index="date", values="float_share"
        )
        part3 = part3 * 10000
        part3_old = pd.read_parquet(homeplace.daily_data_file + "sharenums.parquet")
        part3_new = pd.concat([part3_old, part3]).drop_duplicates()
        part3_new = part3_new[closes.columns]
        part3_new = drop_duplicates_index(part3_new)
        part3_new = part3_new[sorted(list(part3_new.columns))]
        part3_new.to_parquet(homeplace.daily_data_file + "sharenums.parquet")
        logger.success("流通股数更新完成")

        # 总股数
        # 读取新的总股变动数
        part3a = df2s[["date", "code", "total_share"]].pivot(
            columns="code", index="date", values="total_share"
        )
        part3a = part3a * 10000
        part3_olda = pd.read_parquet(
            homeplace.daily_data_file + "total_sharenums.parquet"
        )
        part3_newa = pd.concat([part3_olda, part3a]).drop_duplicates()
        part3_newa = part3_newa.reindex(columns=closes.columns)
        part3_newa = drop_duplicates_index(part3_newa)
        part3_newa = part3_newa[sorted(list(part3_newa.columns))]
        part3_newa.to_parquet(homeplace.daily_data_file + "total_sharenums.parquet")
        logger.success("总股数更新完成")

        # pb
        partpb = df2s[["date", "code", "pb"]].pivot(
            index="date", columns="code", values="pb"
        )
        partpb_old = pd.read_parquet(homeplace.daily_data_file + "pb.parquet")
        partpb_new = pd.concat([partpb_old, partpb])
        partpb_new = partpb_new.drop_duplicates()
        partpb_new = partpb_new[closes.columns]
        partpb_new = partpb_new[sorted(list(partpb_new.columns))]
        partpb_new = drop_duplicates_index(partpb_new)
        partpb_new.to_parquet(homeplace.daily_data_file + "pb.parquet")
        logger.success("市净率更新完成")

        # pe
        partpe = df2s[["date", "code", "pe"]].pivot(
            index="date", columns="code", values="pe"
        )
        partpe_old = pd.read_parquet(homeplace.daily_data_file + "pe.parquet")
        partpe_new = pd.concat([partpe_old, partpe])
        partpe_new = partpe_new.drop_duplicates()
        partpe_new = partpe_new[closes.columns]
        partpe_new = partpe_new[sorted(list(partpe_new.columns))]
        partpe_new = drop_duplicates_index(partpe_new)
        partpe_new.to_parquet(homeplace.daily_data_file + "pe.parquet")
        logger.success("市盈率更新完成")

        # pettm
        partpe = df2s[["date", "code", "pe_ttm"]].pivot(
            index="date", columns="code", values="pe_ttm"
        )
        partpe_old = pd.read_parquet(homeplace.daily_data_file + "pettm.parquet")
        partpe_new = pd.concat([partpe_old, partpe])
        partpe_new = partpe_new.drop_duplicates()
        partpe_new = partpe_new[closes.columns]
        partpe_new = partpe_new[sorted(list(partpe_new.columns))]
        partpe_new = drop_duplicates_index(partpe_new)
        partpe_new.to_parquet(homeplace.daily_data_file + "pettm.parquet")
        logger.success("TTM市盈率更新完成")

        # st
        part4 = df3[["s_info_windcode", "entry_dt", "remove_dt"]]
        part4 = part4.sort_values("s_info_windcode")
        part4.remove_dt = part4.remove_dt.fillna(now).astype(int)
        part4 = part4.set_index("s_info_windcode").stack()
        part4 = part4.reset_index().assign(
            he=sorted(list(range(int(part4.shape[0] / 2))) * 2)
        )
        part4 = part4.drop(columns=["level_1"])
        part4.columns = ["code", "date", "he"]
        part4.date = pd.to_datetime(part4.date, format="%Y%m%d")

        def single(df):
            full = pd.DataFrame({"date": pd.date_range(df.date.min(), df.date.max())})
            df = pd.merge(full, df, on=["date"], how="left")
            df = df.fillna(method="ffill")
            return df

        tqdm.auto.tqdm.pandas()
        part4 = part4.groupby(["code", "he"]).progress_apply(single)
        part4 = part4[part4.date.isin(list(part2_new.index))]
        part4 = part4.reset_index(drop=True)
        part4 = part4.assign(st=1)

        part4 = part4.drop_duplicates(subset=["date", "code"]).pivot(
            index="date", columns="code", values="st"
        )

        part4_0 = pd.DataFrame(0, columns=part2_new.columns, index=part2_new.index)
        part4_0 = part4_0 + part4
        part4_0 = part4_0.replace(np.nan, 0)
        part4_0 = part4_0[part4_0.index.isin(list(part2_new.index))]
        part4_0 = part4_0.T
        part4_0 = part4_0[part4_0.index.isin(list(part2_new.columns))]
        part4_0 = part4_0.T
        part4_0 = part4_0[closes.columns]
        part4_0 = drop_duplicates_index(part4_0)
        part4_0 = part4_0[sorted(list(part4_0.columns))]
        part4_0.to_parquet(homeplace.daily_data_file + "sts.parquet")
        logger.success("st更新完了")

        # 上市天数
        part5_close = pd.read_parquet(
            homeplace.update_data_file + "BasicFactor_Close.parquet"
        )
        part5_close = part5_close[part5_close.index < 20040101]
        part5_close.index = pd.to_datetime(part5_close.index, format="%Y%m%d")
        part5_close = pd.concat([part5_close, closes]).drop_duplicates()
        part5 = np.sign(part5_close).fillna(method="ffill").cumsum()
        part5 = part5[part5.index.isin(list(part2_new.index))]
        part5 = part5.T
        part5 = part5[part5.index.isin(list(part2_new.columns))]
        part5 = part5.T
        part5 = part5[closes.columns]
        part5 = drop_duplicates_index(part5)
        part5 = part5[sorted(list(part5.columns))]
        part5.to_parquet(homeplace.daily_data_file + "ages.parquet")
        logger.success("上市天数更新完了")

database_update_index_three()

读取三大指数的原始行情数据,返回并保存在本地

Source code in pure_ocean_breeze/data/write_data.py
Python
896
897
898
899
900
901
902
903
904
def database_update_index_three():
    """读取三大指数的原始行情数据,返回并保存在本地"""
    hs300 = download_single_index("000300.SH")
    zz500 = download_single_index("000905.SH")
    zz1000 = download_single_index("000852.SH")
    res = pd.concat([hs300, zz500, zz1000], axis=1)
    new_date = datetime.datetime.strftime(res.index.max(), "%Y%m%d")
    res.to_parquet(homeplace.daily_data_file + "3510行情.parquet")
    logger.success(f"3510行情数据已经更新至{new_date}")

database_update_minute_data_to_clickhouse_and_questdb(kind, web_port='9001')

使用米筐更新分钟数据至clickhouse和questdb中

Parameters

kind : str 更新股票分钟数据或指数分钟数据,股票则'stock',指数则'index' web_port : str questdb数据库的web console的端口号, by default '9001'

Raises

IOError 如果未指定股票还是指数,将报错

Source code in pure_ocean_breeze/data/write_data.py
Python
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def database_update_minute_data_to_clickhouse_and_questdb(
    kind: str, web_port: str = "9001"
) -> None:
    """使用米筐更新分钟数据至clickhouse和questdb中

    Parameters
    ----------
    kind : str
        更新股票分钟数据或指数分钟数据,股票则'stock',指数则'index'
    web_port : str
        questdb数据库的web console的端口号, by default '9001'

    Raises
    ------
    `IOError`
        如果未指定股票还是指数,将报错
    """
    if kind == "stock":
        code_type = "CS"
    elif kind == "index":
        code_type = "INDX"
    else:
        raise IOError("总得指定一种类型吧?请从stock和index中选一个")
    # 获取剩余使用额
    user1 = round(rqdatac.user.get_quota()["bytes_used"] / 1024 / 1024, 2)
    logger.info(f"今日已使用rqsdk流量{user1}MB")
    # 获取全部股票/指数代码
    cs = rqdatac.all_instruments(type=code_type, market="cn", date=None)
    codes = list(cs.order_book_id)
    # 获取上次更新截止时间
    chc = ClickHouseClient("minute_data")
    last_date = max(chc.show_all_dates(f"minute_data_{kind}"))
    # 本次更新起始日期
    start_date = pd.Timestamp(str(last_date)) + pd.Timedelta(days=1)
    start_date = datetime.datetime.strftime(start_date, "%Y-%m-%d")
    # 本次更新终止日期
    end_date = datetime.datetime.now()
    if end_date.hour < 17:
        end_date = end_date - pd.Timedelta(days=1)
    end_date = datetime.datetime.strftime(end_date, "%Y-%m-%d")
    logger.info(f"本次将下载从{start_date}{end_date}的数据")
    # 下载数据
    ts = rqdatac.get_price(
        codes,
        start_date=start_date,
        end_date=end_date,
        frequency="1m",
        fields=["volume", "total_turnover", "high", "low", "close", "open"],
        adjust_type="none",
        skip_suspended=False,
        market="cn",
        expect_df=True,
    )
    if ts is not None:
        # 调整数据格式
        ts = ts.reset_index()
        ts = ts.rename(
            columns={
                "order_book_id": "code",
                "datetime": "date",
                "volume": "amount",
                "total_turnover": "money",
            }
        )
        ts = ts.sort_values(["code", "date"])
        ts.date = ts.date.dt.strftime("%Y%m%d").astype(int)
        ts = ts.groupby(["code", "date"]).apply(
            lambda x: x.assign(num=list(range(1, x.shape[0] + 1)))
        )
        ts = (np.around(ts.set_index("code"), 2) * 100).astype(int).reset_index()
        ts.code = ts.code.str.replace(".XSHE", ".SZ")
        ts.code = ts.code.str.replace(".XSHG", ".SH")
        # 数据写入数据库
        ts.to_sql(f"minute_data_{kind}", chc.engine, if_exists="append", index=False)
        ts = ts.set_index("code")
        ts = ts / 100
        ts = ts.reset_index()
        ts.date = ts.date.astype(int).astype(str)
        ts.num = ts.num.astype(int).astype(str)
        qdb = Questdb(web_port=web_port)
        qdb.write_via_df(ts, f"minute_data_{kind}")
        # 获取剩余使用额
        user2 = round(rqdatac.user.get_quota()["bytes_used"] / 1024 / 1024, 2)
        user12 = round(user2 - user1, 2)
        logger.info(f"今日已使用rqsdk流量{user2}MB,本项更新消耗流量{user12}MB")
    else:
        logger.warning(f"从{start_date}{end_date}暂无数据")

database_update_minute_data_to_questdb(kind, web_port='9001')

使用米筐更新分钟数据至questdb中

Parameters

kind : str 更新股票分钟数据或指数分钟数据,股票则'stock',指数则'index' web_port : str questdb数据库的控制台端口号, by default '9001'

Raises

IOError 如果未指定股票还是指数,将报错

Source code in pure_ocean_breeze/data/write_data.py
Python
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def database_update_minute_data_to_questdb(kind: str, web_port: str = "9001") -> None:
    """使用米筐更新分钟数据至questdb中

    Parameters
    ----------
    kind : str
        更新股票分钟数据或指数分钟数据,股票则'stock',指数则'index'
    web_port : str
        questdb数据库的控制台端口号, by default '9001'

    Raises
    ------
    `IOError`
        如果未指定股票还是指数,将报错
    """
    if kind == "stock":
        code_type = "CS"
    elif kind == "index":
        code_type = "INDX"
    else:
        raise IOError("总得指定一种类型吧?请从stock和index中选一个")
    # 获取剩余使用额
    user1 = round(rqdatac.user.get_quota()["bytes_used"] / 1024 / 1024, 2)
    logger.info(f"今日已使用rqsdk流量{user1}MB")
    # 获取全部股票/指数代码
    cs = rqdatac.all_instruments(type=code_type, market="cn", date=None)
    codes = list(cs.order_book_id)
    # 获取上次更新截止时间
    qdb = Questdb(web_port=web_port)
    last_date = max(qdb.show_all_dates(f"minute_data_{kind}"))
    # 本次更新起始日期
    start_date = pd.Timestamp(str(last_date)) + pd.Timedelta(days=1)
    start_date = datetime.datetime.strftime(start_date, "%Y-%m-%d")
    # 本次更新终止日期
    end_date = datetime.datetime.now()
    if end_date.hour < 17:
        end_date = end_date - pd.Timedelta(days=1)
    end_date = datetime.datetime.strftime(end_date, "%Y-%m-%d")
    logger.info(f"本次将下载从{start_date}{end_date}的数据")
    # 下载数据
    ts = rqdatac.get_price(
        codes,
        start_date=start_date,
        end_date=end_date,
        frequency="1m",
        fields=["volume", "total_turnover", "high", "low", "close", "open"],
        adjust_type="none",
        skip_suspended=False,
        market="cn",
        expect_df=True,
    )
    if ts is not None:
        # 调整数据格式
        ts = ts.reset_index()
        ts = ts.rename(
            columns={
                "order_book_id": "code",
                "datetime": "date",
                "volume": "amount",
                "total_turnover": "money",
            }
        )
        ts = ts.sort_values(["code", "date"])
        ts.date = ts.date.dt.strftime("%Y%m%d").astype(int)
        ts = ts.groupby(["code", "date"]).apply(
            lambda x: x.assign(num=list(range(1, x.shape[0] + 1)))
        )
        ts = ts.ffill().dropna()
        ts.code = ts.code.str.replace(".XSHE", ".SZ")
        ts.code = ts.code.str.replace(".XSHG", ".SH")
        ts.date = ts.date.astype(int).astype(str)
        ts.num = ts.num.astype(int).astype(str)
        # 数据写入数据库
        qdb.write_via_df(ts, f"minute_data_{kind}")
        # 获取剩余使用额
        user2 = round(rqdatac.user.get_quota()["bytes_used"] / 1024 / 1024, 2)
        user12 = round(user2 - user1, 2)
        logger.info(f"今日已使用rqsdk流量{user2}MB,本项更新消耗流量{user12}MB")
    else:
        logger.warning(f"从{start_date}{end_date}暂无数据")

database_update_zxindustry_member()

更新中信一级行业的成分股

Source code in pure_ocean_breeze/data/write_data.py
Python
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
def database_update_zxindustry_member():
    """更新中信一级行业的成分股"""
    old_codes = pd.read_parquet(homeplace.daily_data_file + "中信一级行业哑变量代码版.parquet")
    old_names = pd.read_parquet(homeplace.daily_data_file + "中信一级行业哑变量名称版.parquet")
    old_enddate = old_codes.date.max()
    old_enddate_str = datetime.datetime.strftime(old_enddate, "%Y%m%d")
    now = datetime.datetime.now()
    now_str = datetime.datetime.strftime(now, "%Y%m%d")
    logger.info(f"中信一级行业数据,上次更新到了{old_enddate_str},本次将更新至{now_str}")
    start_date = old_enddate + pd.Timedelta(days=1)
    start_date = datetime.datetime.strftime(start_date, "%Y%m%d")
    codes = list(
        set(rqdatac.all_instruments(type="CS", market="cn", date=None).order_book_id)
    )
    trs = read_daily(tr=1)
    trs = trs[trs.index > old_enddate]
    dates = list(trs.index)
    dfs_codes = []
    dfs_names = []
    if len(dates) >= 1:
        for date in tqdm.auto.tqdm(dates):
            df = rqdatac.get_instrument_industry(
                codes, source="citics_2019", date=date, level=1
            )
            if df.shape[0] > 0:
                df_code = df.first_industry_code.to_frame(date)
                df_name = df.first_industry_name.to_frame(date)
                dfs_codes.append(df_code)
                dfs_names.append(df_name)
        dfs_codes = pd.concat(dfs_codes, axis=1)
        dfs_names = pd.concat(dfs_names, axis=1)

        def new_get_dummies(df):
            dums = []
            for col in tqdm.auto.tqdm(list(df.columns)):
                series = df[col]
                dum = pd.get_dummies(series)
                dum = dum.reset_index()
                dum = dum.assign(date=col)
                dums.append(dum)
            dums = pd.concat(dums)
            return dums

        dfs_codes = new_get_dummies(dfs_codes)
        dfs_names = new_get_dummies(dfs_names)

        a = read_daily(tr=1, start=20100101)

        def save(df, old, file):
            df = df.rename(columns={"order_book_id": "code"})
            df = df[["date", "code"] + sorted(list(df.columns)[1:-1])]
            df.code = df.code.apply(lambda x: convert_code(x)[0])
            df = pd.concat([old, df], ignore_index=True)
            df = df[df.date.isin(list(a.index))]
            df=df.reset_index(drop=True).replace(True,1).replace(False,0)
            df.to_parquet(homeplace.daily_data_file + file)
            return df

        dfs_codes = save(dfs_codes, old_codes, "中信一级行业哑变量代码版.parquet")
        dfs_names = save(dfs_names, old_names, "中信一级行业哑变量名称版.parquet")
        logger.success(f"中信一级行业数据已经更新至{now_str}了")
    else:
        logger.warning(f"从{start_date}{now_str}暂无数据")

download_calendar(startdate, enddate)

更新单日的数据

Source code in pure_ocean_breeze/data/write_data.py
Python
439
440
441
442
443
444
445
446
447
448
449
450
451
452
@retry
def download_calendar(startdate, enddate):
    """更新单日的数据"""
    try:
        # 交易日历
        df0 = pro.a_calendar(start_date=startdate, end_date=enddate)
        time.sleep(1)
        return df0
    except Exception:
        time.sleep(60)
        # 交易日历
        df0 = pro.a_calendar(start_date=startdate, end_date=enddate)
        time.sleep(1)
        return df0

download_single_daily(day)

更新单日的数据

Source code in pure_ocean_breeze/data/write_data.py
Python
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
@retry
def download_single_daily(day):
    """更新单日的数据"""
    try:
        # 8个价格,交易状态,成交量,
        df1 = pro.a_daily(
            trade_date=day,
            fields=[
                "code",
                "trade_date",
                "open",
                "high",
                "low",
                "close",
                "volume",
                "adjopen",
                "adjclose",
                "adjhigh",
                "adjlow",
                "tradestatus",
                "adjfactor",
                "limit",
                "stopping",
                "avgprice",
            ],
        )
        # 换手率,流通股本,换手率要除以100,流通股本要乘以10000
        df2 = pro.daily_basic(
            trade_date=day,
            fields=[
                "ts_code",
                "trade_date",
                "turnover_rate",
                "total_share",
                "float_share",
                "pe",
                "pb",
                'pe_ttm',
            ],
        )
        time.sleep(1)
        return df1, df2
    except Exception:
        time.sleep(60)
        # 8个价格,交易状态,成交量,
        df1 = pro.a_daily(
            trade_date=day,
            fields=[
                "code",
                "trade_date",
                "open",
                "high",
                "low",
                "close",
                "volume",
                "adjopen",
                "adjclose",
                "adjhigh",
                "adjlow",
                "tradestatus",
                "adjfactor",
                "limit",
                "stopping",
                "avgprice",
            ],
        )
        # 换手率,流通股本,换手率要除以100,流通股本要乘以10000
        df2 = pro.daily_basic(
            trade_date=day,
            fields=[
                "ts_code",
                "trade_date",
                "turnover_rate",
                "total_share",
                "float_share",
                "pe",
                "pb",
                'pe_ttm'
            ],
        )
        time.sleep(1)
        return df1, df2

download_single_day_style(day)

更新单日的数据

Source code in pure_ocean_breeze/data/write_data.py
Python
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
@retry
def download_single_day_style(day):
    """更新单日的数据"""
    try:
        style = pro.RMExposureDayGet(
            trade_date=str(day),
            fields="tradeDate,ticker,BETA,MOMENTUM,SIZE,EARNYILD,RESVOL,GROWTH,BTOP,LEVERAGE,LIQUIDTY,SIZENL",
        )
        time.sleep(1)
        return style
    except Exception:
        time.sleep(60)
        style = pro.RMExposureDayGet(
            trade_date=str(day),
            fields="tradeDate,ticker,BETA,MOMENTUM,SIZE,EARNYILD,RESVOL,GROWTH,BTOP,LEVERAGE,LIQUIDTY,SIZENL",
        )
        time.sleep(1)
        return style