当前位置: 首页 > 面试题库 >

新的Dataframe列作为其他行的通用功能(火花)

陈成济
2023-03-14
问题内容

如何有效地创建一个新列 DataFrame ,它是其他行的功能 spark

这是spark我在这里描述的问题的一种实现:

from nltk.metrics.distance import edit_distance as edit_dist
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

d = {
    'id': [1, 2, 3, 4, 5, 6],
    'word': ['cat', 'hat', 'hag', 'hog', 'dog', 'elephant']
}

spark_df = sqlCtx.createDataFrame(pd.DataFrame(d))
words_list = list(spark_df.select('word').collect())

get_n_similar = udf(
    lambda word: len(
        [
            w for w in words_list if (w['word'] != word) and 
            (edit_dist(w['word'], word) < 2)
        ]
    ),
    IntegerType()
)

spark_df.withColumn('n_similar', get_n_similar(col('word'))).show()

输出:

+---+--------+---------+
|id |word    |n_similar|
+---+--------+---------+
|1  |cat     |1        |
|2  |hat     |2        |
|3  |hag     |2        |
|4  |hog     |2        |
|5  |dog     |1        |
|6  |elephant|0        |
+---+--------+---------+

这里的问题是我不知道一种方法,不能先将spark当前行与的其他行进行比较,Dataframe而不必先将值收集到中list。有没有一种方法可以应用其他行的泛型函数而无需调用collect


问题答案:

这里的问题是,我不知道一种方法,可以在不首先将值收集到列表中的情况下,告诉spark将当前行与Dataframe中的其他行进行比较。

UDF是不是一种选择,在这里(你不能引用分布DataFrameudf)你的逻辑的直接翻译是笛卡尔乘积和汇总:

from pyspark.sql.functions import levenshtein, col

result = (spark_df.alias("l")
    .crossJoin(spark_df.alias("r"))
    .where(levenshtein("l.word", "r.word") < 2)
    .where(col("l.word") != col("r.word"))
    .groupBy("l.id", "l.word")
    .count())

但实际上,您应该尝试做一些更有效的事情:ApacheSpark中的有效字符串匹配

根据问题,您应尝试查找其他近似值以避免完整的笛卡尔积。

如果要保留不匹配的数据,则可以跳过一个过滤器:

(spark_df.alias("l")
    .crossJoin(spark_df.alias("r"))
    .where(levenshtein("l.word", "r.word") < 2)
    .groupBy("l.id", "l.word")
    .count()
    .withColumn("count", col("count") - 1))

或(速度较慢,但​​通用性更高),请参考加入:

(spark_df
    .select("id", "word")
    .distinct()
    .join(result, ["id", "word"], "left")
    .na.fill(0))


 类似资料:
  • 问题内容: 在中创建 其他列中 最快的列的最快(最有效)方法 是 什么? 考虑以下示例: 产生: 假设我想创建一个新列,该列包含一个值,该值基于使用函数将当前行中的单词与中的其他行进行比较的输出而得出。 这的确产生了正确的输出,但是它使用了and ,这对于large而言并不是很有效。 有没有一种方法可以 矢量化 (正确的术语?)这种方法?还是有另一种更好(更快)的方法来做到这一点? 在原始帖子中,

  • 问题内容: 我想申请我的自定义函数(它使用的梯)这六个列我的数据帧的每一行中)。 我尝试了与其他问题不同的方法,但似乎仍然找不到适合我问题的正确答案。关键在于,如果该人被视为西班牙裔,就不能被视为其他任何人。即使他们在另一个种族栏中的得分为“ 1”,他们仍然被视为西班牙裔,而不是两个或两个以上的种族。同样,如果所有ERI列的总和大于1,则将它们计为两个或多个种族,并且不能计为唯一的种族(西班牙裔除

  • 8. 其他功能 8.1. 点名 点击菜单栏,选择点名,设置签到的时长后,可发起签到,查看学员的在线听课情况。 8.2. 布局切换 点击菜单栏,选择布局切换,可切换课堂布局,包括讲课模式、主视频模式、视频平铺模式,具体可参见第四章节。 8.3. 课堂设置 点击菜单栏,选择设置,可设置全体禁言、全体下麦、连麦方式、视频清晰度等。 全体禁言 禁止所有学员参与文字聊天。 全体关麦 关闭所有学员的麦克风,学

  • TestMain 在写测试时,有时需要在测试之前或之后进行额外的设置(setup)或拆卸(teardown);有时,测试还需要控制在主线程上运行的代码。为了支持这些需求,testing 提供了 TestMain 函数: func TestMain(m *testing.M) 如果测试文件中包含该函数,那么生成的测试将调用 TestMain(m),而不是直接运行测试。TestMain 运行在主 g

  • 除了上面提到的功能,使用Java 9,JDK平台还有很多增强功能。 其中一些列在下面。 GC(垃圾收集器)改进 Stack-Walking API 过滤传入的序列化数据 弃用Applet API Indify String Concatenation Enhanced Method Handles Java平台日志记录API和服务 紧凑的字符串 Nashorn的Parser API

  • Django的其他核心功能 ① 静态文件 ② csrf ③ 状态保持 ④ 中间件 ⑤ 密码管理

  • 问题内容: 我需要用一列更新表的每一行,作为同一表中其他两列的总和 像这样 UPDATE table1 SET table1.column1 =每行的总和(table1.column1 + table1.column2) 我试过了 因此,我可以通过首先选择所有rowId来迭代每个rowid来做到这一点 但是我需要在一个查询中处理该表中的所有行 当我尝试: 这将汇总column1和column2的所

  • 在我的scala程序中,我有一个dataframe,其中有两列和(类型都为)。除此之外,我有一个先前定义的对象和一些方法和属性。在这里,我想要使用dataframe的当前值和中的属性向dataframe添加一个新列。 例如,如果我有下面的dataframe: 谢谢你。