本文演示以Spark作为分析引擎,Cassandra作为数据存储,而使用Spring Boot来开发驱动程序的示例。
1.前置条件
创建keyspace
CREATE KEYSPACE hfcb WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };
创建table
CREATE TABLE person ( id text PRIMARY KEY, first_name text, last_name text );
插入测试数据
insert into person (id,first_name,last_name) values('1','wang','yunfei'); insert into person (id,first_name,last_name) values('2','peng','chao'); insert into person (id,first_name,last_name) values('3','li','jian'); insert into person (id,first_name,last_name) values('4','zhang','jie'); insert into person (id,first_name,last_name) values('5','liang','wei');
2.spark-cassandra-connector安装
让Spark-1.5.1能够使用Cassandra作为数据存储,需要加上下面jar包的依赖(示例将包放置于 /opt/spark/managed-lib/ 目录,可任意):
cassandra-clientutil-3.0.2.jar cassandra-driver-core-3.1.4.jar guava-16.0.1.jar cassandra-thrift-3.0.2.jar joda-convert-1.2.jar joda-time-2.9.9.jar libthrift-0.9.1.jar spark-cassandra-connector_2.10-1.5.1.jar
在 /opt/spark/conf 目录下,新建 spark-env.sh 文件,输入下面内容
SPARK_CLASSPATH=/opt/spark/managed-lib/*
3.Spring Boot应用开发
添加 spark-cassandra-connector 和 spark 依赖
<dependency> <groupId>com.datastax.spark</groupId> <artifactId>spark-cassandra-connector_2.10</artifactId> <version>1.5.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.5.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.5.1</version> </dependency>
在 application.yml 中配置 spark 与 cassandra 路径
spark.master: spark://master:7077 cassandra.host: 192.168.1.140 cassandra.keyspace: hfcb
此处特别说明 spark://master:7077 是域名形式而不是ip地址,可修改本地 hosts 文件将 master 与 ip 地址映射。
配置 SparkContext 和 CassandraSQLContext
@Configuration public class SparkCassandraConfig { @Value("${spark.master}") String sparkMasterUrl; @Value("${cassandra.host}") String cassandraHost; @Value("${cassandra.keyspace}") String cassandraKeyspace; @Bean public JavaSparkContext javaSparkContext(){ SparkConf conf = new SparkConf(true) .set("spark.cassandra.connection.host", cassandraHost) // .set("spark.cassandra.auth.username", "cassandra") // .set("spark.cassandra.auth.password", "cassandra") .set("spark.submit.deployMode", "client"); JavaSparkContext context = new JavaSparkContext(sparkMasterUrl, "SparkDemo", conf); return context; } @Bean public CassandraSQLContext sqlContext(){ CassandraSQLContext cassandraSQLContext = new CassandraSQLContext(javaSparkContext().sc()); cassandraSQLContext.setKeyspace(cassandraKeyspace); return cassandraSQLContext; } }
简单调用
@Repository public class PersonRepository { @Autowired CassandraSQLContext cassandraSQLContext; public Long countPerson(){ DataFrame people = cassandraSQLContext.sql("select * from person order by id"); return people.count(); } }
启动即可如常规Spring Boot程序一样执行。
源码地址: https://github.com/wiselyman/spring-spark-cassandra.git
总结
以上所述是小编给大家介绍的Spring Boot与Spark、Cassandra系统集成开发示例,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对小牛知识库网站的支持!
我得到了一个错误:- 线程“main”java.lang.nosuchmethoderror:com.datastax.driver.core.queryoptions.setrefreshnodeintervalmillis(I)lcom/datastax/driver/core/queryoptions;**在com.datastax.spark.connector.cql.defaultCo
本文向大家介绍springboot集成mqtt的实践开发,包括了springboot集成mqtt的实践开发的使用技巧和注意事项,需要的朋友参考一下 序 MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景。这里简单介绍一下如何在springboot中集成。 maven 配置cl
本文向大家介绍将node.js与SAP HANA系统集成,包括了将node.js与SAP HANA系统集成的使用技巧和注意事项,需要的朋友参考一下 您可以使用node.js将数据插入HANA数据库。您还可以通过JDBC驱动程序连接到SAP HANA数据库。 要通过JDBC连接,您需要安装JDBC驱动程序ngdbc.jar。此驱动程序是作为SAP HANA客户端安装的一部分安装的。Ngdbc.jar
本文向大家介绍springboot集成redis实现简单秒杀系统,包括了springboot集成redis实现简单秒杀系统的使用技巧和注意事项,需要的朋友参考一下 本文实例为大家分享了springboot集成redis实现简单秒杀系统的具体代码,供大家参考,具体内容如下 项目是有地址的,我会放到文章的最后面 1. 直接service,我们会介绍两种秒杀模式 2. service实现类 3. con
新建一个项目 新建一个配置类,需要一个配置类,配置类里面需要装配好提供出去的类 使用EnableXXX注解或者spring.factory配置,将提供的类加入spring容器的管理 package com.clsaa.edu.springboot; import org.springframework.boot.SpringApplication; import org.springframew
我想把Nifi flowfile发送到Spark,在Spark中做一些转换,然后再把结果发送回Nifi,这样我就可以在Nifi中进行进一步的操作。我不想写flowfile写到数据库或HDFS,然后触发火花作业。我想直接将flowfile发送到Spark,并直接从Spark接收到NIFI的结果。我尝试在Nifi中使用ExecuteSparkInteractive处理器,但我被卡住了。任何例子都是有帮
Hyperledger Composer可以通过使用Loopback API与现有系统集成。集成现有系统使你可以从现有业务系统中提取数据,并将其转换为Composer业务网络中的资产或参与者。 生成一个REST API Hyperledger Composer包含独立的将业务网络暴露为REST API的Node.js进程。LoopBack框架用于生成由Swagger文档描述的Open API。 从
有人能解释一下DataStax Cassandra-Spark连接器最近的版本历史中发生了什么吗?