代码拉取完成,页面将自动刷新
from datetime import datetime
import sqlalchemy as sa
from celery import states
from sqlalchemy import DateTime, String
from sqlalchemy.orm import Mapped, mapped_column
from libs.datetime_utils import naive_utc_now
from .base import TypeBase
from .types import BinaryData, LongText
class CeleryTask(TypeBase):
"""Task result/status."""
__tablename__ = "celery_taskmeta"
id: Mapped[int] = mapped_column(
sa.Integer, sa.Sequence("task_id_sequence"), primary_key=True, autoincrement=True, init=False
)
task_id: Mapped[str] = mapped_column(String(155), unique=True)
status: Mapped[str] = mapped_column(String(50), default=states.PENDING)
result: Mapped[bytes | None] = mapped_column(BinaryData, nullable=True, default=None)
date_done: Mapped[datetime | None] = mapped_column(
DateTime,
insert_default=naive_utc_now,
default=None,
onupdate=naive_utc_now,
nullable=True,
)
traceback: Mapped[str | None] = mapped_column(LongText, nullable=True, default=None)
name: Mapped[str | None] = mapped_column(String(155), nullable=True, default=None)
args: Mapped[bytes | None] = mapped_column(BinaryData, nullable=True, default=None)
kwargs: Mapped[bytes | None] = mapped_column(BinaryData, nullable=True, default=None)
worker: Mapped[str | None] = mapped_column(String(155), nullable=True, default=None)
retries: Mapped[int | None] = mapped_column(sa.Integer, nullable=True, default=None)
queue: Mapped[str | None] = mapped_column(String(155), nullable=True, default=None)
class CeleryTaskSet(TypeBase):
"""TaskSet result."""
__tablename__ = "celery_tasksetmeta"
id: Mapped[int] = mapped_column(
sa.Integer, sa.Sequence("taskset_id_sequence"), autoincrement=True, primary_key=True, init=False
)
taskset_id: Mapped[str] = mapped_column(String(155), unique=True)
result: Mapped[bytes | None] = mapped_column(BinaryData, nullable=True, default=None)
date_done: Mapped[datetime | None] = mapped_column(
DateTime, insert_default=naive_utc_now, default=None, nullable=True
)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。