MapR Data Platform offers significant advantages over any other tool on the big data space. MapR-DB is one of the core components of the platform and it offers state of the art capabilities that blow away most of the NoSQL databases out there. An important add-on to MapR-DB is the ability to use, for writing and querying, Apache Spark through the Using this connector comes very handy since it can read and write from spark to MapR-DB using the different Spark APIs such as RDDs, DataFrames, and Streams. Connector for Apache Spark . Using the connector we can issue queries like the following one. The resulting type is a that we can use as any other Dataframe from any other source, as we normally do in Spark. Dataframe If we then filter our data set out, problems start to emerge. For instance, let’s look at the following query. The filter is being pushed down, so MapR-DB does the filtering and only sends back the data that complies with the filter reducing the amount of data transferred between MapR-DB and Spark. However, if there is an index created on the field first-name, the index is ignored and the table is fully scanned, trying to find the rows that comply with the filter. By having an index on a field, we expect to use it so queries on that fields are optimized, ultimately speeding up the computation. The provided connector is simply not using this capability. Necessity Our team, MapR Professional Services, knows that filtering using MapR-DB secondary indexes is huge for performance and since many of our customers do actually try to take advantages of this feature (secondary indexes) we have taken different approaches in order to force the use of the indexes when using Spark. The following post was written by a fellow coworker, where he explains some ways to overcome the issue on hand. How to use secondary indexes in Spark with OJAI , Even when we can take some shortcuts, we have to give up some of the of nice constructs the default connector has such as . .loadFromMapRDB(...) An Independent Connector In the past, I have extended Apache Spark in many ways. I have written my own and most recently a . too Custom Data Sources Custom Streaming Source for Spark Structured Streams Once again, I have sailed in the adventure to write my own Spark Data Source, but this time for MapR-DB so we leverage the full advantages of secondary indexes while keeping the same API the Current MapR-DB Connector for Apache Spark has. At the end of this post, we will be able to write a query in the following way while fully using secondary indexes. Spark Data Sources Version 2 The following data source implementation uses spark 2.3.1 and uses the data source API V2. Let’s start by looking at the things we need. , allows us to create a DataSourceReader. ReadSupportWithSchema , allows us to get the schema for our data while we need to specify how to create a . DataSourceReader DataReaderFactory , allows us to intercept the query filters so we can push them down to MapR-DB. SupportsPushDownFilters , allows us to intercept the query projections so we can push them down to MapR-DB. SupportsPushDownRequiredColumns Let’s start by implementing . ReadSupportWithSchema As we can see, we simply get the table path and the schema we want to use when reading from MapR-DB. Then we pass them to . MapRDBDataSourceReader MapRDBDataSourceReader implements and we are also mixing in and to indicate that we want to push filters and projections down to MapR-DB. MapRDBDataSourceReader DataSourceReader SupportsPushDownFilters SupportsPushDownRequiredColumns The variable will hold the schema we want to project if any. In case we don’t explicitly project fields by doing we will project all the fields on the variable. projections .select schema works in conjunction with and . If in our Spark query we specify a then the selected fields are passed to and those are the only fields we will bring from MapR-DB. **readSchema** projections pruneColumns select pruneColumns indicates what filters we have specified in the or clause in our Spark query. Basically, we have to decide which of those we want to push down to MapR-DB, the other ones will be applied by Spark after the data is in memory. **pushFilters** where filter In the snippet above, we are indicating we are going to push down only two types of filters, and Any other filter besides these two will not be pushed down and the filtering will happen in memory (spark memory) after the data is loaded from MapR-DB. EqualTo GreaterThan . We are working on adding more filters to match the current MapR-DB Connector. creates a list of data readers that actually do the heavy work of reading from our source, MapR-DB. In our case, we are only creating one data reader, but ideally, we have one reader for each MapR-DB region/partition so we can take advantage of parallelism offered by MapR-DB. **createDataReaderFactories** MapRDBDataReaderFactory We are almost done, yet, the most important parts is about to come. The is where we actually build the MapR-DB query and execute it again our MapR-DB table. Notice we are passing the table we are going to read from, the filters and projections we want to push down. **MapRDBDataReaderFactory** Now we need to connect to MapR-DB by opening a connection and creating a document store object. builds the query condition we want to execute against MapR-DB. This is the most important part of our entire implementation. **createFilterCondition** In here we are combining all the filters. As we can see, we are implementing our two supported filters only for two data types, but we are working on extending this implementation to match the current MapR-DB Connector. creates the final command to be sent to MapR-DB. This task is a matter of applying the query condition and the projections to our object. **query** **connection** It is very important to notice that since we are using , it will automatically use any secondary indexes for fields that are part of the filters we are applying. Make sure you check the output at the end of this post. OJAI is a stream of data coming from MapR-DB based on . **documents** **query** uses the stream we have created ( ) to do the actual reading and returning the data back to Spark. **createDataReader** **documents** Using our Connector At this point, we are ready to plug in our custom data source into spark in the following way. This allows us to use our own way to read from MapR-DB so that any filter being applied that is part of a secondary index on the physical table will be used to optimize the reading. Syntax In order to maintain a similar API to the one offered by the default MapR-DB Connector, we added some syntax to our library in the following way. We can now use our connector in the same way we used to use the default connector. Using MapR-DB Secondary Indexes When we run the code above, the output from looks similar to the following. TRACE OJAI Notice that it automatically uses the index called which is an index for the field that at the same time is the field being used in the spark filter. uid_idx uid Conclusions MapR-DB is a powerful tool that runs as part of the MapR Data Platform. The Spark Connector offers an interesting way to interact with MapR-DB since it allows us to use all Spark constructs at scale when working with this NoSQL system. However, some times the default connector falls short because it does not use the secondary index capabilities of MapR-DB when we need them the most. On the other hand, our implementation mimics the Connector API and ensures that the implemented Spark data source uses MapR-DB secondary indexes since it relies on pure OJAI queries that are able to support secondary indexes out of the box. Our library code can be found here MapRDBConnector . Disclaimer: This is an independent effort to improve querying MapR-DB. This library is not a substitute to the official Connector for Apache Spark offered by MapR as part of its distribution.