Skip to content

SnackFS@Calliope,using cassandra as storage for Spark Streaming Checkpoint #2

@chinmaya24x7

Description

@chinmaya24x7

Hi,

I have a requirement for using Apache Cassandra as Spark Streaming Checkpoint directory as we don't want to use HDFS.
I went through the documentation for Calliope-SnackFS.
Please help me to figure out below points for SnackFS:

  1. I am trying to use snackfs_2.10.3-0.6.2-C2-EA version.It has cassandra 2.0.x.
    In my use case i have installed cassandra 2.2.5 and spark 1.6.3.Is the same build of SanckFS will work for higher version of cassandra and Spark.If not ,is there any way to upgrade the mentioned tools to higher version for SnackFS.
  2. From documentation to use snackfs as checkpoint:

ssc.checkpoint("snackfs://path/to/checkpoint/dir")

A. I have 3 node cassandra cluster.
Do i need to put Snackfs in all the 3 cluster and for each cluster i need to modify core-site.xml to point to local cassandra node? Please let me know the correct setup.

B. If i want to use SnackFS for spark checkpoint,what will be the directoryURL/Path to be used when spark is running on different host than cassandra;
e.g My Spark is running on host x123.com,cassandra on a123.com,b123.com,c123.com
then how shall we give the checkpoint path,
ssc.checkpoint("snackfs://a123.com:9160/sparkcheckpoint")?
If the above url correct?

C. Assuming the way to use spark check point is snackfs://hostname-cassandra:thriftserverport/path
Architecture: Spark send rdd to mentioned cassandra host and then the other nodes sync data from the mentioned host?
Is my architecture understanding is correct?

D. Making the same assumption like Point C, if the mentioned cassandra node goes down,then will spark streaming will be still working and sending data to other cassandra node?

I tried to use ssc.checkpoint("snackfs://a123.com:9160/sparkcheckpoint"),but i am getting below error:

16/07/23 05:58:45 ERROR ThriftStore: failed to retrieve Inode for path snackfs://a123.com:9160/sparkcheckpoint
NotFoundException()
at org.apache.cassandra.thrift.Cassandra$get_result$get_resultStandardScheme.read(Cassandra.java:10379)
at org.apache.cassandra.thrift.Cassandra$get_result$get_resultStandardScheme.read(Cassandra.java:10347)
at org.apache.cassandra.thrift.Cassandra$get_result.read(Cassandra.java:10262)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
at org.apache.cassandra.thrift.Cassandra$Client.recv_get(Cassandra.java:633)
at org.apache.cassandra.thrift.Cassandra$AsyncClient$get_call.getResult(Cassandra.java:2019)
at com.tuplejump.snackfs.cassandra.store.ThriftStore$$anonfun$com$tuplejump$snackfs$cassandra$store$ThriftStore$$performGet$1.applyOrElse(ThriftStore.scala:310)
at com.tuplejump.snackfs.cassandra.store.ThriftStore$$anonfun$com$tuplejump$snackfs$cassandra$store$ThriftStore$$performGet$1.applyOrElse(ThriftStore.scala:307)
at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117)
at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Regards,
Chinmaya Nanda

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions