Class: OCI::ObjectStorage::Transfer::Multipart::MultipartObjectAssembler

Inherits:
Object
  • Object
show all
Defined in:
lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb

Overview

MultipartObjectAssembler provides a simplified interaction when uploading large objects using multi-part uploads.

An assembler can be used to begin a new upload, or resume a previous one. A new assembler should be created per new upload or resumed upload to be performed. The same assembler is not resubale across multiple new uploads/resumes.

Constant Summary collapse

MD5_CALC_PART_READ_BYTES =
8 * OCI::ObjectStorage::Transfer::MEBIBYTE
DEFAULT_MAX_ATTEMPTS =

Settings for the exponential backoff and retry (with jitter) which the assembler does

3
DEFAULT_BASE_SLEEP_MILLIS =
1000
DEFAULT_MAX_SLEEP_TIME_MILLIS =
8000
DEFAULT_EXPONENTIAL_GROWTH_FACTOR =
2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(object_storage_client:, namespace:, bucket_name:, object_name:, multipart_part_size: OCI::ObjectStorage::Transfer::MULTIPART_PART_SIZE, non_file_io_multipart_part_size: OCI::ObjectStorage::Transfer::NON_FILE_IO_MULTIPART_PART_SIZE, parallel_process_count: OCI::ObjectStorage::Transfer::Multipart::DEFAULT_PARALLEL_PROCESS_COUNT, multipart_upload_opts: {}) ⇒ MultipartObjectAssembler

Returns a new instance of MultipartObjectAssembler.

Parameters:

  • object_storage_client (OCI::ObjectStorage::ObjectStorageClient)

    The client used to interact with the Object Storage service

  • namespace (String)

    The namespace containing the bucket in which to store the object

  • bucket_name (String)

    The bucket where we'll upload the object

  • object_name (String)

    The name of the object in Object Storage

  • multipart_part_size (Integer) (defaults to: OCI::ObjectStorage::Transfer::MULTIPART_PART_SIZE)

    The size, in bytes, of each part of a multipart upload. This applies when we are uploading files from disk and defaults to 128 MiB

  • non_file_io_multipart_part_size (Integer) (defaults to: OCI::ObjectStorage::Transfer::NON_FILE_IO_MULTIPART_PART_SIZE)

    The size, in bytes, of each part of a multipart upload when we are reading from stdin or a non-file IO-like source (e.g. a StringIO). Defaults to 10 MiB

  • parallel_process_count (Integer) (defaults to: OCI::ObjectStorage::Transfer::Multipart::DEFAULT_PARALLEL_PROCESS_COUNT)

    How many parts we can upload in parallel. Defaults to 3

  • multipart_upload_opts (Hash) (defaults to: {})

Options Hash (multipart_upload_opts:):

  • :if_match (String)

    The entity tag to match. Only used for new multipart uploads and ignored otherwise.

  • :if_none_match (String)

    The entity tag to avoid matching. The only valid value is *, which indicates that the request should fail if the object already exists. Only used for new multipart uploads and ignored otherwise.

  • :opc_client_request_id (String)

    The client request ID for tracing. Will be applied to all requests made by this assembler.

  • :content_type (String)

    The content type of the object. Defaults to 'application/octet-stream' if not overridden. Only used for new multipart uploads and ignored otherwise.

  • :content_language (String)

    The content language of the object. Only used for new multipart uploads and ignored otherwise.

  • :content_encoding (String)

    The content encoding of the object. Only used for new multipart uploads and ignored otherwise.

  • :metadata (Hash<String, String>)

    A hash of string keys to string values representing any custom metadata to be applied to the object. Only used for new multipart uploads and ignored otherwise.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 104

def initialize(object_storage_client:,
               namespace:,
               bucket_name:,
               object_name:,
               multipart_part_size: OCI::ObjectStorage::Transfer::MULTIPART_PART_SIZE,
               non_file_io_multipart_part_size: OCI::ObjectStorage::Transfer::NON_FILE_IO_MULTIPART_PART_SIZE,
               parallel_process_count: OCI::ObjectStorage::Transfer::Multipart::DEFAULT_PARALLEL_PROCESS_COUNT,
               multipart_upload_opts: {})
  @object_storage_client = object_storage_client
  @namespace = namespace
  @bucket_name = bucket_name
  @object_name = object_name
  @multipart_part_size = multipart_part_size
  @non_file_io_multipart_part_size = non_file_io_multipart_part_size
  @parallel_process_count = parallel_process_count
  @object_io = nil

  @manifest = {
    upload_id: nil,
    namespace: @namespace,
    bucket_name: @bucket_name,
    object_name: @object_name,
    object_io_or_file_path: nil,
    parts: OCI::ObjectStorage::Transfer::Multipart::Internal::MultipartUploadPartsCollection.new([])
  }

  @multipart_upload_opts = multipart_upload_opts
  @multipart_upload_opts.delete('content_md5') if @multipart_upload_opts.key?('content_md5')

  @max_attempts = DEFAULT_MAX_ATTEMPTS
  @base_sleep_millis = DEFAULT_BASE_SLEEP_MILLIS
  @exponential_growth_factor = DEFAULT_EXPONENTIAL_GROWTH_FACTOR
  @max_sleep_time_millis = DEFAULT_MAX_SLEEP_TIME_MILLIS
end

Instance Attribute Details

#base_sleep_millisInteger

For exponential backoff and retry, the base time to use in our retry calculation in milliseconds. Defaults to 1000ms

Returns:

  • (Integer)


74
75
76
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 74

def base_sleep_millis
  @base_sleep_millis
end

#bucket_nameString

The bucket where we'll upload the object

Returns:

  • (String)


42
43
44
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 42

def bucket_name
  @bucket_name
end

#exponential_growth_factorInteger

For exponential backoff and retry, the exponent which we will raise to the power of the number of attempts. Defaults to 2

Returns:

  • (Integer)


82
83
84
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 82

def exponential_growth_factor
  @exponential_growth_factor
end

#manifestHash (readonly)

Contains the upload ID for the multipart upload and other upload-specific information (e.g. the destination namespace, bucket and object, and the parts which have been uploaded)

Returns:

  • (Hash)


87
88
89
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 87

def manifest
  @manifest
end

#max_attemptsInteger

If we encounter a failure when performing an operation and need to retry, the maximum number of attempts we can make before declaring failure. Attempts are 1-based, i.e. the first call we make is considered attempt 1.

Returns:

  • (Integer)


70
71
72
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 70

def max_attempts
  @max_attempts
end

#max_sleep_time_millisInteger

The maximum amount of time to wait between retries. Defaults to 8000ms

Returns:

  • (Integer)


78
79
80
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 78

def max_sleep_time_millis
  @max_sleep_time_millis
end

#multipart_part_sizeInteger

The size, in bytes, of each part of a multipart upload. This applies when we are uploading files from disk and defaults to 128 MiB

Returns:

  • (Integer)


51
52
53
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 51

def multipart_part_size
  @multipart_part_size
end

#multipart_upload_optsHash

A bag of optional parameter (e.g. the client request ID, metadata) which we can use when making calls to the Object Storage service.

Returns:

  • (Hash)


65
66
67
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 65

def multipart_upload_opts
  @multipart_upload_opts
end

#namespaceString

The namespace containing the bucket in which to store the object

Returns:

  • (String)


38
39
40
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 38

def namespace
  @namespace
end

#non_file_io_multipart_part_sizeInteger

The size, in bytes, of each part of a multipart upload when we are reading from stdin or a non-file IO-like source (e.g. a StringIO). Defaults to 10 MiB

Returns:

  • (Integer)


56
57
58
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 56

def non_file_io_multipart_part_size
  @non_file_io_multipart_part_size
end

#object_nameString

The name of the object in Object Storage

Returns:

  • (String)


46
47
48
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 46

def object_name
  @object_name
end

#object_storage_clientOCI::ObjectStorage::ObjectStorageClient

The client used to interact with the Object Storage service



34
35
36
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 34

def object_storage_client
  @object_storage_client
end

#parallel_process_countInteger

How many parts we can upload in parallel. Defaults to 3. If this is set to 1, this is the equivalent of not allowing parts to be uploaded in parallel.

Returns:

  • (Integer)


61
62
63
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 61

def parallel_process_count
  @parallel_process_count
end

Instance Method Details

#abort(upload_id) ⇒ Response

Aborts a multipart upload.

Parameters:

  • upload_id (String)

    The ID of the multipart upload to abort.

Returns:

  • (Response)

    A Response object with data of type nil



365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 365

def abort(upload_id)
  abort_opts = {}
  abort_opts[:opc_client_request_id] = @multipart_upload_opts[:opc_client_request_id] \
    if @multipart_upload_opts[:opc_client_request_id]

  response = make_retrying_call do
    @object_storage_client.abort_multipart_upload(
      @namespace,
      @bucket_name,
      @object_name,
      upload_id,
      abort_opts
    )
  end
  response
end

#commitResponse

Commits the multipart upload.

an opc-multipart-md5 key.

Returns:

  • (Response)

    A Response object with data of type nil. For a multipart upload, the headers of the response will contain



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 327

def commit
  raise 'Cannot commit as this MultipartObjectAssembler has not been initialized with an upload' \
    unless @manifest[:upload_id]

  parts = @manifest[:parts].to_a

  commit_upload_details = OCI::ObjectStorage::Models::CommitMultipartUploadDetails.new
  commit_upload_details.parts_to_commit = []
  commit_upload_details.parts_to_exclude = []

  parts.each do |part|
    if part[:etag]
      commit_upload_details.parts_to_commit << OCI::ObjectStorage::Models::CommitMultipartUploadPartDetails.new(
        partNum: part[:part_number],
        etag: part[:etag]
      )
    end

    commit_upload_details.parts_to_exclude << part[:part_number] unless part[:etag]
  end

  response = make_retrying_call do
    @object_storage_client.commit_multipart_upload(
      @namespace,
      @bucket_name,
      @object_name,
      @manifest[:upload_id],
      commit_upload_details
    )
  end

  response
end

#io_for_transfer=(object_io_or_file_path) ⇒ Object

Initializes the parts in the manifest based on the provided input. If the input is stdin then the list of parts in the manifest will be left empty (the parts in this case are figured out dynamically at upload time since we may not have all the information in advance). If the input is the path to a file (a String), a file or a IO-like object (e.g. a StringIO) then the parts in the manifest will be initialized so that the assembler can go through and upload them.

No uploads will be performed until the upload method is called.

Parameters:

  • object_io_or_file_path (String, IO)

    Either a path to the file to upload, an IO-like object containing the data to upload or $stdin.



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 197

def io_for_transfer=(object_io_or_file_path)
  # Supports IO-wrapping objects we can convert to an IO. An example is Rails'
  # ActionDispatch::Http::UploadedFile, which wraps an IO (a Tempfile) but
  # doesn't expose all the IO operations directly (e.g. you can't write to it, it's not seekable)
  #
  # This should be safe to use with IO and its subclasses as well as to_io is a method on IO:
  # http://ruby-doc.org/core-2.3.1/IO.html#method-i-to_io and returns itself if called on
  # an IO
  @manifest[:object_io_or_file_path] = object_io_or_file_path.to_io if object_io_or_file_path.respond_to?(:to_io)
  @manifest[:object_io_or_file_path] = object_io_or_file_path unless object_io_or_file_path.respond_to?(:to_io)

  return if stdin?(object_io_or_file_path)

  opened_file = false
  if object_io_or_file_path.is_a?(String)
    object_io = File.open(object_io_or_file_path, 'rb')
    opened_file = true
  end
  object_io = object_io_or_file_path if object_io_or_file_path.respond_to?(:seek)

  total_size = object_io.size if object_io.respond_to?(:size)
  total_size = object_io.stat.size unless object_io.respond_to?(:size)

  part_size_to_use = @multipart_part_size if file_based_io?(object_io_or_file_path)
  part_size_to_use = @non_file_io_multipart_part_size unless file_based_io?(object_io_or_file_path)

  offset = 0
  part_number = 1
  while offset < total_size
    part_info = {
      offset: offset,
      part_size: calculate_part_size(total_size, offset, part_size_to_use),
      part_number: part_number,
      part_md5_hash: nil
    }
    @manifest[:parts].push(part_info)
    offset += part_size_to_use
    part_number += 1
  end

  nil
ensure
  object_io.close if opened_file && object_io
end

#new_uploadResponse

Initializes a new multipart upload.

Returns:

  • (Response)

    A Response object with data of type OCI::ObjectStorage::Models::MultipartUpload



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 142

def new_upload
  raise 'This MultipartObjectAssembler has already been initialized with an upload' if @manifest[:upload_id]

  create_multipart_upload_opts = {}
  create_multipart_upload_opts[:if_match] = @multipart_upload_opts[:if_match] if @multipart_upload_opts[:if_match]
  create_multipart_upload_opts[:if_none_match] = @multipart_upload_opts[:if_none_match] \
    if @multipart_upload_opts[:if_none_match]

  create_multipart_upload_opts[:opc_client_request_id] = @multipart_upload_opts[:opc_client_request_id] \
    if @multipart_upload_opts[:opc_client_request_id]

  create_multipart_upload_details =
    OCI::ObjectStorage::Models::CreateMultipartUploadDetails.new(object: @object_name)

  create_multipart_upload_details.content_type = @multipart_upload_opts[:content_type] \
    if @multipart_upload_opts[:content_type]

  create_multipart_upload_details.content_language = @multipart_upload_opts[:content_language] \
    if @multipart_upload_opts[:content_language]

  create_multipart_upload_details.content_encoding = @multipart_upload_opts[:content_encoding] \
    if @multipart_upload_opts[:content_encoding]

  create_multipart_upload_details.storage_tier = @multipart_upload_opts[:storage_tier] \
    if @multipart_upload_opts[:storage_tier]

  if @multipart_upload_opts[:metadata] && !@multipart_upload_opts[:metadata].empty?
     = {}
    @multipart_upload_opts[:metadata].each do |key, value|
      [key] = value if key.to_s.start_with?('opc-meta-')
      ["opc-meta-#{key}"] = value unless key.to_s.start_with?('opc-meta-')
    end
    create_multipart_upload_details. = 
  end

  response = make_retrying_call do
    @object_storage_client.create_multipart_upload(
      @namespace, @bucket_name,
      create_multipart_upload_details,
      create_multipart_upload_opts
    )
  end
  @manifest[:upload_id] = response.data.upload_id

  response
end

#resume(upload_id) ⇒ Array

Resume uploading a multipart object to Object Storage. This assumes that the assembler already has knowledge of the parts which it could potentially need to upload (i.e. they have been prepared via the set_io_for_transfer method).

Prior to resuming the upload, we'll attempt to reconcile with Object Storage any previously uploaded parts so that they are not uploaded again

one element per error which occurred.

Parameters:

  • upload_id (String)

    The ID of the multipart upload to resume.

Returns:

  • (Array)

    If the multipart upload was successful, an empty array. If there were errors then the array will contain



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 296

def resume(upload_id)
  raise 'An upload ID must be provided' if upload_id.nil?
  raise 'Parts must be initialized prior to resuming' if @manifest[:parts].length.zero?

  manifest[:upload_id] = upload_id
  upload_parts = list_uploaded_parts(upload_id)
  known_parts = @manifest[:parts].to_a

  raise 'There are more parts on the server than parts to resume, please check the upload ID.' \
    if upload_parts.length > known_parts.length

  upload_parts.each do |up|
    index = up.part_number - 1
    manifest_part = known_parts[index]

    if manifest_part[:part_size] != up.size
      raise 'Cannot resume upload with different part size. ' \
        + "Parts were uploaded with a part size of #{up.size / OCI::ObjectStorage::Transfer::MEBIBYTE} MiB"
    end

    manifest_part[:etag] = up.etag
    manifest_part[:opc_md5] = up.md5
  end

  upload
end

#uploadArray

Performs a multipart upload

one element per error which occurred.

Returns:

  • (Array)

    If the multipart upload was successful, an empty array. If there were errors then the array will contain



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 246

def upload
  all_parts = @manifest[:parts].to_a
  pending_parts = OCI::ObjectStorage::Transfer::Multipart::Internal::MultipartUploadPartsCollection.new(
    all_parts.select { |ap| ap[:opc_md5].nil? && ap[:etag].nil? }
  )

  unless file_based_io?(@manifest[:object_io_or_file_path]) || stdin?(@manifest[:object_io_or_file_path])
    seekable_io_wrapper = OCI::ObjectStorage::Transfer::Multipart::Internal::SeekableNonFilePartIOWrapper.new(
      source: @manifest[:object_io_or_file_path]
    )
  end

  if stdin?(@manifest[:object_io_or_file_path])
    stdin_io_wrapper = OCI::ObjectStorage::Transfer::Multipart::Internal::StdinPartIOWrapper.new(
      source: @manifest[:object_io_or_file_path]
    )
  end

  threads = []
  @parallel_process_count.times do
    thread = Thread.new do
      begin
        upload_non_stdin(pending_parts, seekable_io_wrapper) unless stdin?(@manifest[:object_io_or_file_path])
        upload_stdin(stdin_io_wrapper) if stdin?(@manifest[:object_io_or_file_path])

        nil
      rescue => e
        pending_parts.clear! # Stop any futher processing on error
        e
      end
    end

    thread.abort_on_exception = true # Do not continue if we encounter exceptions
    threads << thread
  end

  threads.map(&:value).compact
end