Using Yandex Object Storage in Yandex Data Processing
This section describes various ways that processes running in Yandex Data Processing clusters can access objects from Object Storage buckets.
Note
Configure a cluster network before setting up access to Yandex Cloud services and internet resources.
Component settings impact bucket file read and write performance:
- The settings specified when creating a cluster affect all the jobs running in the cluster.
- The settings specified when creating jobs override cluster-level settings and can be job-specific.
DistCp
To copy files from Object Storage to HDFS, use the DistCp
To authenticate in Object Storage, you can use one of the following approaches:
- Use the IAM token of the cluster service account.
- Use CredentialProvider
. - Provide the
access key
andsecret key
parameters of static access keys when creating jobs.
Accessing S3 with authentication via the IAM token of a cluster service account
-
When creating a cluster, specify a service account. If the cluster is already created, add a service account using the Edit button in the management console.
Make sure to assign the following roles to the service account:
- dataproc.agent: To enable the service account to get info on cluster host states, jobs, and log groups.
- dataproc.provisioner: To enable the service account to work with an autoscaling instance group. This will enable subcluster autoscaling.
-
The service account must have access to the appropriate bucket. To do this, grant the service account permissions in the bucket ACL, or the
storage.viewer
orstorage.editor
role.For more information about these roles, see the Object Storage documentation.
For example, get a list of files located in the
yc-mdb-examples
public bucket atdataproc/example01/set01
. To do this, connect to the cluster and run the command:hadoop fs -ls s3a://yc-mdb-examples/dataproc/example01/set01
Result:
Found 12 items -rw-rw-rw- 1 root root 19327838 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_1.parquet -rw-rw-rw- 1 root root 21120204 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_10.parquet -rw-rw-rw- 1 root root 20227757 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/ ...
Copying via CredentialProvider
To use a secret storage provider, place the secrets within the components that need access to Object Storage. To do this, you can use JCEKS
In this example, you first create a file with secrets and then place it in HDFS:
-
Specify the static and secret keys, e.g.:
hadoop credential create fs.s3a.access.key \ -value <static_key> \ -provider localjceks://file/home/jack/yc.jceks && \ hadoop credential create fs.s3a.secret.key \ -value <secret_key> \ -provider localjceks://file/home/jack/yc.jceks
-
Copy the secrets file to your local HDFS:
hdfs dfs -put /home/jack/yc.jceks /user/root/
-
Copy the file from Object Storage directly to HDFS:
hadoop distcp \ -D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \ -D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \ -update \ -skipcrccheck \ -numListstatusThreads 10 \ s3a://yc-mdb-examples/dataproc/example01/set01 \ hdfs://<HDFS_host>/<path>/
<HDFS_host>
: Your target HDFS server. You can get the default server using this command:hdfs getconf -confKey fs.defaultFS
Example of the command to copy files from the bucket:
hadoop distcp \
-D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
-D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \
-update \
-skipcrccheck \
-numListstatusThreads 10 \
s3a://yc-mdb-examples/dataproc/example01/set01 \
hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/
Copying files by passing keys in arguments
Instead of creating a secrets file, you can pass keys in the command arguments:
hadoop distcp \
-D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
-D fs.s3a.bucket.dataproc-examples.access.key=<static_key> \
-D fs.s3a.bucket.dataproc-examples.secret.key=<secret_key> \
-update \
-skipcrccheck \
-numListstatusThreads 10 \
s3a://yc-mdb-examples/dataproc/example01/set01 \
hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/
Optimizing file reads from Object Storage
The method for reading data from a bucket is decided by the fs.s3a.experimental.input.fadvise
setting
- Image versions
1.0
through1.4
usesequential
as the default value. It is a good choice for sequential file reads, but slow for random access. If you use random file access more frequently, addrandom
to the cluster component properties or job settings. - For version
2.0
images, the default isnormal
: files are accessed sequentially; however, if an application is performing random access operations,random
mode activates automatically.
For more information on the component versions used, see Runtime environment.
Optimizing file writes to Object Storage
To speed up file writes to Object Storage, you can:
Using S3A committers
S3A committers are Apache Hadoop software modules used for writing data to object storage over the S3 protocol to ensure efficient and near-atomic commits of the changes made. For more information, see the Apache Hadoop
Note
S3A committers are not used or required for operations with tables that are managed using the tools of the DeltaLake
S3A committers run in three basic modes:
Mode | Environment | HDFS required | Writing data to partitionedtables | Write speed |
---|---|---|---|---|
directory |
MapReduce, Spark | Yes* | Complete overwrite | Standard |
magic |
MapReduce, Spark | No (data is written directly to S3) | Not supported | Maximum |
partitioned |
Spark | Yes* | Replacing partitions and appending them | Standard |
* In directory
and partitioned
modes, no checks are performed for actual presence of HDFS for storing intermediate data. Some jobs may be successfully completed without HDFS. However, this may cause issues with complex jobs, such as "file not found" errors or incomplete uploads of job results to Object Storage.
To enable S3A committers, configure the following settings:
core:fs.s3a.committer.magic.enabled : true
if jobs will use themagic
mode.core:fs.s3a.committer.name
withdirectory
,magic
, orpartitioned
as the default mode.core:fs.s3a.committer.staging.abort.pending.uploads : false
for Hadoop 3.2.2 as part of Yandex Data Processing image 2.0 orcore:fs.s3a.committer.abort.pending.uploads : false
for Hadoop 3.3.2 as part of image 2.1 if multiple concurrent jobs are writing data to the same table.core:mapreduce.outputcommitter.factory.scheme.s3a : org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
.spark:spark.hadoop.fs.s3a.committer.name
withdirectory
,magic
, orpartitioned
as the default mode.spark:spark.sql.parquet.output.committer.class : org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
.spark:spark.sql.sources.commitProtocolClass : org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
.- (Optional)
core:fs.s3a.committer.staging.conflict-mode
: Action if existing data partitions are found in the target table (forpartitioned
mode):append
: Append new data to an existing partition.fail
: When attempting to overwrite an existing partition, the job terminates with an error.replace
: Existing partition data is replaced with new partition data.
The current S3A committer mode may be overridden for a specific job by setting fs.s3a.committer.name
and spark.hadoop.fs.s3a.committer.name
to directory
, magic
, or partitioned
as required.
Do not change the default value for spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version
because Yandex Object Storage does not support atomic folder renames.
Apache Hadoop settings
The method of writing data to an Object Storage bucket is decided by the core:fs.s3a.fast.upload
setting. Its value depends on the image version used:
- Image versions
1.0
through1.4
usefalse
as the default value to save RAM. Set it totrue
in the cluster component properties or job settings. This will improve bucket write performance for large files and prevent node storage from filling up. - In image
2.0
,fs.s3a.fast.upload
is enabled by default.
If required, configure other settings
fs.s3a.committer.threads
: Number of threads committing changes in Object Storage towards the end of the job.fs.s3a.connection.maximum
: Number of allowed Object Storage connections.fs.s3a.connection.timeout
: Maximum Object Storage connection timeout in milliseconds.fs.s3a.fast.upload.active.blocks
: Maximum number of blocks in a single output thread.fs.s3a.fast.upload.buffer
: Type of buffer used for temporary storage of the upload data:disk
: Save data to the folder specified infs.s3a.buffer.dir
.array
: Use arrays in the JVM heap.bytebuffer
: Use RAM from outside the JVM heap.
fs.s3a.max.total.tasks
: Size of queued Object Storage bucket operations that cannot run due to reaching the thread limit.fs.s3a.multipart.size
: Size of chunks in bytes the data will be broken down into when copying or uploading to a bucket.fs.s3a.threads.max
: Number of threads in the AWS Transfer Manager.
Note
Large values of these parameters might cause an increase in the usage of computing resources on Yandex Data Processing cluster hosts.
For more information, see the Apache Hadoop documentation
Apache Spark settings
When accessing data in Object Storage from Spark jobs, we recommend setting spark.sql.hive.metastorePartitionPruning
to true
.
When working with data in Parquet format, the following Spark job settings are recommended:
spark.hadoop.parquet.enable.summary-metadata : false
spark.sql.parquet.mergeSchema : false
spark.sql.parquet.filterPushdown : true
When working with data in Parquet format and using dynamic partition overwrites, the following settings are recommended:
spark:spark.sql.sources.partitionOverwriteMode : dynamic
spark:spark.sql.parquet.output.committer.class : org.apache.parquet.hadoop.ParquetOutputCommitter
spark:spark.sql.sources.commitProtocolClass : org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
When working with data in Orc format, the following Spark job settings are recommended:
spark.sql.orc.cache.stripe.details.size : 10000
spark.sql.orc.filterPushdown : true
spark.sql.orc.splits.include.file.footer : true
It may take a long time for the jobs creating or updating massive numbers (hundreds and thousands) of table partitions to update partition records in the Hive Metastore cluster. To speed up this process, increase the following values:
hive:datanucleus.connectionPool.maxPoolSize
: Maximum size of the Metastore DB connection pool.hive:hive.metastore.fshandler.threads
: Number of threads running background operations with the file system within Metastore.spark:spark.sql.addPartitionInBatch.size
: Number of partitions updated per Metastore call. The optimal value is10 × <hive:hive.metastore.fshandler.threads_setting_value>
or higher.
Note
If these values are excessively high, you may run out of Metastore system resources. A large Metastore database connection pool may require updating the settings and ramping up the cluster's computing resources.
For more information, see the Apache Spark documentation
Using s3fs
s3fs
allows mounting Object Storage buckets using Fuse. Read more about using the utility at s3fs.
Using Object Storage from Spark
Implement the desired access option:
-
Using JCEKS:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net"); sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", ""); sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); sc.hadoopConfiguration.set("hadoop.security.credential.provider.path", "jceks://hdfs/<path_to_JCEKS_file>");
-
Using your access key and secret:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net"); sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", ""); sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); sc.hadoopConfiguration.set("fs.s3a.access.key","<access_key>"); sc.hadoopConfiguration.set("fs.s3a.secret.key","<bucket_secret>");
You can then read the file from Object Storage:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.parquet("s3a://<bucket_name>/<object_path>")
Select the access method:
-
Accessing the Object Storage objects using JCEKS:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "") sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") sc._jsc.hadoopConfiguration().set("hadoop.security.credential.provider.path", "jceks://hdfs/<path_to_JCEKS_file>")
-
Reading a file using an access key and bucket secret:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "") sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") sc._jsc.hadoopConfiguration().set("fs.s3a.access.key","<access_key>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key","<bucket_secret>")
Once you have access, you can read the file directly from Object Storage:
from pyspark.sql import SQLContext
sql = SQLContext(sc)
df = sql.read.parquet("s3a://<bucket_name>/<object_path>")
For more information, see Spark settings for integration with Yandex Object Storage.