当前位置: 首页 > 面试题库 >

如何控制气流安装的并行性或并发性?

柯河
2023-03-14
问题内容

在我的某些Apache Airflow安装中,即使调度程序似乎未完全加载,预定运行的DAG或任务也不会运行。如何增加可以同时运行的DAG或任务的数量?

同样,如果我的安装处于高负载状态,并且我想限制Airflow工作人员拉出排队任务的速度(例如以减少资源消耗),我该如何调整以减少平均负载?


问题答案:

这是自Airflow
v1.10.2起可用的配置选项的扩展列表。可以在每个DAG或每个操作员的基础上进行设置,但是如果未指定,则可能会落回到设置范围的默认值。

可以 基于每个DAG 指定的选项:

  • concurrency:已设置为允许在DAG的所有活动运行中同时运行的任务实例数。默认为core.dag_concurrency未设置
  • max_active_runs:此DAG的最大活动运行次数。一旦达到此限制,调度程序将不会创建新的活动DAG运行。默认为core.max_active_runs_per_dag未设置

例子:

# Only allow one run of this DAG to be running at any given time
dag = DAG('my_dag_id', max_active_runs=1)

# Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs
dag = DAG('example2', concurrency=10, max_active_runs=2)

可以 在每个操作员的基础上 指定的选项:

  • pool:用于执行任务的池。池只能用于限制 部分 任务的并行性
  • task_concurrency:具有相同执行日期的任务运行的并发限制

例:

t1 = BaseOperator(pool='my_custom_pool', task_concurrency=12)

在整个Airflow设置 中指定的选项:

  • core.parallelism:整个Airflow安装中运行的最大任务数
  • core.dag_concurrency:每个DAG可以运行的最大任务数(跨多个 DAG运行
  • core.non_pooled_task_slot_count:分配给不在池中运行的任务的任务插槽数
  • core.max_active_runs_per_dag:每个DAG的最大活动DAG 运行 次数
  • scheduler.max_threads:调度程序进程应使用多少个线程来调度DAG
  • celery.worker_concurrency如果使用CeleryExecutor,则 工作者一次将处理的最大任务实例数 __
  • celery.sync_parallelism:CeleryExecutor用于同步任务状态的进程数


 类似资料:
  • 我知道我可以通过以下方式将转换为。 我现在的问题是,我在那个序列中有700个未来,我希望能够控制并行解决其中的多少个,因为每个未来都将调用内部rest api,同时有700个请求就像对那个服务器发起dos攻击。 我宁愿一次只能解决10个期货。 我如何才能做到这一点? 尝试pamu的答案,我看到了错误:

  • 我正在使用kafka-node ConsumerGroup来消费来自主题的消息。ConsumerGroup在使用消息时需要调用外部API,这可能需要一秒钟的时间来响应。我希望控制从队列中消费下一条消息,直到我从API得到响应,这样消息就会被顺序地处理。 我该如何控制这种行为?

  • 如何编写限制Qpromise并发的方法? 例如,我有一个方法。 我希望一次生成不超过5个进程,但对调用代码是透明的。 我需要实现的是一个带有签名的函数 我可以这样称呼他 我已经开始编写我的版本,但我想知道是否有人有一个简洁的实现,我可以对照它进行检查。

  • 根据文档[1],我一直试图在Akka stream中并行化一个流,但由于某些原因,我没有得到预期的结果。 我遵循了留档中列出的步骤,我不认为我错过了什么。然而,我的流的计算都是按顺序一个接一个地发生的。 我错过了什么? [1] https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html 示例输出 我希望看到两个计算同时进

  • 主要内容:并发控制的问题在并发控制中,可以同时执行多个事务。 它可能会影响事务结果。保持这些事务的执行顺序非常重要。 并发控制的问题 并发事务以不受控制的方式执行时可能会出现几个问题。 以下是并发控制中的三个问题。 更新丢失 脏读 不可重复读取 1. 更新丢失 当访问相同数据库项的两个事务包含其操作时,某些数据库项的值不正确,则会发生丢失的更新问题。 如果两个事务T1和T2读取记录然后更新它,那么第二个更新将覆盖更新第一

  • 配置样例 样例 1 限制 com.foo.BarService 的每个方法,服务器端并发执行(或占用线程池线程数)不能超过 10 个: <dubbo:service interface="com.foo.BarService" executes="10" /> 样例 2 限制 com.foo.BarService 的 sayHello 方法,服务器端并发执行(或占用线程池线程数)不能超过 10

  • 如果一个语言要实现支持并发执行的接口,则一般来说需要在并发控制上下功夫,原因就是前面说的,由于虚拟机实现的细节问题,直接依赖宿主环境的并发容易出问题。简单地,以使用宿主的线程为例。假如源语言的线程对应宿主环境的真线程,那么同步操作就需要用到线程间的互斥量,比如锁,信号量等 一个程序需要并发,一般来说有三个原因: 一,为充分利用多核cpu资源,提高计算速度。这个原因是很重要,但在实际中其重要性我觉得

  • 问题内容: 我已经使用了Spring Security 3.0.7,并且正在我的项目中实现并发控制。但这是行不通的。我用过了 甚至我尝试了Spring安全参考中的解决方案,但没有成功。这是我的配置文件内容: 我收到以下异常: 有人可以帮忙解决这个问题吗? 问题答案: 如果您已经编写了和(您自己的实现),则应该重写Object 和方法。