年化20-30%,AI量化平台策略和数据已经更新(代码+数据下载)

def download(request, pk: int):    from lab.config import DATA_DIR    item = StrategyInfo.objects.get(pk=pk)    with open(DATA_DIR.joinpath('strategy.toml').resolve()) as f:        c = f.read()    response = FileResponse()    response['Content-Type'] = 'application/octet-stream'    response['Content-Disposition'] = 'attachment;filename="{0}"'.format('strategy.toml')    return response

吾日三省吾身

有时候,有些事,确实不必太介怀。

或者说,有时候,就是过于敏感。

要往坏处想,也总能“自洽”。

但解决不了任何问题,只会徒增烦恼。

水至清,则无鱼。

只要不触及原则和底限,生活中,允许很多的不确定性。

突变不就是进化的动力来源吗?

允许一些事情发生,并非你所设想,结果超出预期,或者不及预期。

不必气急败坏,更不必撂狠话,更不能做一些过激的事情。

事后看,云淡风清。

“多少人曾在你生命中来了又还”,朋友聚聚散散很正常。

有时候,一些路,总是需要一个人走。

今天的几项工作进展如下:

1、django-apscheduler,下载热门etf/lof数据。

2、把之前的策略发布一遍

3、对所有策略进行回测并保存数据。

4、网站的一些细节:vip的管理后台筛选,社区列表页的样式,文章可编辑和删除,评论可编辑和删除。

数据和日期已经更新到最新:

图片

图片

Django-Apscheduler使用不复杂,但在服务器上部署时,有一点需要特别注意,我们使用了多进程部署,那么会启动多个定时任务管理,会出现任务重复,下面我做了一个基于redis的“任务锁”

def apscheduler_lock(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        lock_key = "apscheduler_lock"
        lock_value = "locked"
        # 连接到Redis
        redis_client = redis.Redis(host=SERVER_IP, port=6379)
        # 尝试获取锁
        acquired_lock = redis_client.set(lock_key, lock_value, nx=True, ex=60)
        if acquired_lock:
            # 如果成功获得了锁,执行apscheduler程序
            result = func(*args, **kwargs)
            # 执行完毕后释放锁
            redis_client.delete(lock_key)
            return result
        else:
            # 没有获得锁,跳过执行apscheduler程序
            return None  # 可以根据需要返回其他信息
        return wrapper

然后我们定期更新etf/lof数据,并且更新策略回测:

def job_update_quotes():
    print(' 任务开始运行!{}'.format(time.strftime("%Y-%m-%d %H:%M:%S")))
    print('job_test')
    items = mongo_utils.get_db()['basic'].find({'tags': {'$in': ['热门']}})
    for item in items:
        symbol = item['symbol']
        print('开始更新:{}'.format(symbol))
        update_quotes(symbol)
        print('完成更新:{}'.format(symbol))


def start_scheduler():
    # 监控任务
    register_events(scheduler)  # 这个event 这个会有已经被废弃的用法删除线,我不知道这个删除了 ,还会不会好用
    scheduler.add_job(job_update_quotes, 'interval', seconds=10)
    scheduler.add_job(run_strategies, 'cron', hour='20', minute='30', args=[], id='tasks', replace_existing=True)
    scheduler.start()

之后,我们需要把数据源切回到线上:

在咱们Quantlab3.7版本中,仍然使用CSVDataloader,在读CSV文件时,会判断symbol_date.csv,如果不存在,尝试从服务器中读取,若读到数据,则覆盖symbol_date.csv。

class CSVDataloader(Dataloader):
    def __init__(self, path:WindowsPath, symbols, start_date='20100101', end_date=datetime.now().strftime('%Y%m%d')):
        super(CSVDataloader, self).__init__(path, symbols, start_date, end_date)
        # 尝试创建当天文件夹
        self.today_path = path.joinpath(datetime.now().date().strftime('%Y%m%d'))
        self.today_path.mkdir(exist_ok=True)

    def _try_get_from_server(self, symbol):
        URL = 'http://127.0.0.1:8000/api/quotes/{}'.format(symbol)
        try:
            items = requests.get(URL).json()['data']
            return pd.DataFrame(items)
        except:
            return None

    def _read_csv(self, csvs, symbol):
        file = symbol + '.csv'
        if file in csvs:
            print('从本地读取:{}'.format(file))
            df = pd.read_csv(self.today_path.joinpath(file), index_col=None)
        else:
            # 从服务器更新
            df = self._try_get_from_server(symbol)
            if df is not None:
                print('从服务器读取:{}'.format(symbol))
                df.to_csv(self.today_path.joinpath(file), index=False)
            else:
                print('获取数据失败:{}'.format(symbol))
        df['date'] = df['date'].apply(lambda x: str(x))
        df.set_index('date', inplace=True)
        df['symbol'] = symbol
        df.index = pd.to_datetime(df.index)
        df.sort_index(inplace=True, ascending=True)
        df = df.loc[self.start_date:, :]
        return df

发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/103478
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!

(0)
股市刺客的头像股市刺客
上一篇 2024 年 7 月 29 日
下一篇 2024 年 7 月 29 日

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注