我想的一個實體傳遞PipelineTask給PipelineTask.add然而,當我嘗試,我收到了NameError其中提到PipelineTask沒有定義。
我相信這是因為PipelineTask只有在PipelineTask.__init__()被呼叫后才被系結。
class Task(BaseModel, abc.ABC):
id: str
@abc.abstractmethod
async def run(self):
pass
class PipelineTask(Task):
@abc.abstractmethod
async def run(self):
pass
def add(self, task: PipelineTask):
## TODO: how do a pass a instance of self here?
next = self
if self.next == None:
self.next = task
next = self.next
else:
current = self.next
while current != None:
current = current.next
next = current
current.next = next
return self
class Pipeline(BaseModel):
"""
Pipeline execute a sequence of tasks
...
init = PipelineTask(id=0)
pipeline = Pipeline(init=PrepareDataPipelineTask(id='prepare'))
pipeline.add(ExecutePipelineTask(id='execute')).add(CollectResultsPipelineTask(id='collect'))
pipeline.run()
...
"""
# The pipelines innitial task
init: PipelineTask
async def run(self):
await self.init.run()
has_next = self.init.next != None
next = self.init
while has_next:
next = next.next
await next.run()
has_next = next.next != None
## Adds a task to the end of the pipeline
async def add(self, next: PipelineTask):
"""add Task to the end of the pipeline"""
self.init.add(next)
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
async def test_create_pipeline():
tasks = (
StdoutTask(id=1, next=None)
.add(StdoutTask(id=2, next=None))
.add(StdoutTask(id=3, next=None))
)
pipeline = Pipeline(init=tasks)
await pipeline.run()
用法示例:
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
@pytest.mark.asyncio
async def test_create_pipeline():
tasks = StdoutTask(id=1).add(StdoutTask(id=2)).add(StdoutTask(id=3))
pipeline = Pipeline(init=tasks)
await pipeline.run()
pass
我嘗試了 'un'specifyingtask的型別,但隨后task缺少next屬性(例如AttributeError: 'XXXX' object has no attributed next)
def add(self, task):
...
我也試過修改task.__class__ = PipelineTask它添加了額外的方法而不是屬性。
以下是單個檔案可重現的示例
from pydantic import BaseModel
import abc
import asyncio
class Task(BaseModel, abc.ABC):
id: str
@abc.abstractmethod
async def run(self):
pass
class PipelineTask(Task):
@abc.abstractmethod
async def run(self):
pass
def add(self, task: PipelineTask):
## TODO: how do a pass a instance of self here?
next = self
if self.next == None:
self.next = task
next = self.next
else:
current = self.next
while current != None:
current = current.next
next = current
current.next = next
return self
class Pipeline(BaseModel):
"""
Pipeline execute a sequence of tasks
...
init = PipelineTask(id=0)
pipeline = Pipeline(init=PrepareDataPipelineTask(id='prepare'))
pipeline.add(ExecutePipelineTask(id='execute')).add(CollectResultsPipelineTask(id='collect'))
pipeline.run()
...
"""
# The pipelines innitial task
init: PipelineTask
async def run(self):
await self.init.run()
has_next = self.init.next != None
next = self.init
while has_next:
next = next.next
await next.run()
has_next = next.next != None
## Adds a task to the end of the pipeline
async def add(self, next: PipelineTask):
"""add Task to the end of the pipeline"""
self.init.add(next)
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
async def test_create_pipeline():
tasks = (
StdoutTask(id=1, next=None)
.add(StdoutTask(id=2, next=None))
.add(StdoutTask(id=3, next=None))
)
pipeline = Pipeline(init=tasks)
await pipeline.run()
解決方案
用 getattr
def add(self, task: "PipelineTask"):
next = getattr(self, "next", None)
if self.next == None:
self.next = task
next = self.next
else:
current = getattr(self, "next", None)
while current != None:
current = getattr(current, "next", None)
next = getattr(current, "next", None)
current.next = next
return self
uj5u.com熱心網友回復:
問題可能出在您的地方if self.next == None:;只需設定self.next在里面__init__或改為將其稱為getattr(self, "next", None)
>> class Test():
... def add(self):
... print(getattr(self, "missing", None))
...
>>> t = Test()
>>> t.add()
None
>>> t.missing = "not missing"
>>> t.add()
not missing
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/367721.html
下一篇:根據條目的型別洗掉資料框中的行
