Class: LazyData::Value

Inherits:
Object
  • Object
show all
Defined in:
lib/lazy_data/value.rb

Overview

A lazy value with thread-safe memoization. The first time accessed it will call a given block to compute its value, and will cache that value. Subsequent requests will return the cached value.

At most one thread will be allowed to run the computation; if another thread is already in the middle of a computation, any new threads requesting the value will wait until the existing computation is complete, and will use that computation's result rather than kicking off their own computation.

If a computation fails with an exception, that exception will also be memoized and reraised on subsequent accesses. A LazyData::Value can also be configured so subsequent accesses will retry the computation if the previous computation failed. The maximum number of retries is configurable, as is the retry "interval", i.e. the time since the last failure before an access will retry the computation.

By default, a computation's memoized value (or final error after retries have been exhausted) is maintained for the lifetime of the Ruby process. However, a computation can also cause its result (or error) to expire after a specified number of seconds, forcing a recomputation on the next access following expiration, by calling expiring_value or raise_expiring_error.

Instance Method Summary collapse

Constructor Details

#initialize(retries: nil, lifetime: nil, &block) ⇒ Value

Create a LazyData::Value.

You must pass a block that will be called to compute the value the first time it is accessed. The block should evaluate to the desired value, or raise an exception on error. To specify a value that expires, use LazyData.expiring_value. To raise an exception that expires, use LazyData.raise_expiring_error.

You can optionally pass a retry manager, which controls how subsequent accesses might try calling the block again if a compute attempt fails with an exception. A retry manager should either be an instance of Retries or an object that duck types it.

Parameters:

  • retries (LazyData::Retries) (defaults to: nil)

    A retry manager. The default is a retry manager that tries only once.

  • lifetime (Numeric, nil) (defaults to: nil)

    The default lifetime of a computed value. Optional. No expiration by default if not provided. This can be overridden in the block by returning LazyData.expiring_value or calling LazyData.raise_expiring_error explicitly.

  • block (Proc)

    A block that can be called to attempt to compute the value.

Raises:

  • (ArgumentError)


70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/lazy_data/value.rb', line 70

def initialize(retries: nil, lifetime: nil, &block)
  @retries = retries || Retries.new
  @default_lifetime = lifetime
  @compute_handler = block
  raise ArgumentError, "missing compute handler block" unless block

  # Internally implemented by a state machine, protected by a mutex that
  # ensures state transitions are consistent. The states themselves are
  # implicit in the values of the various instance variables. The
  # following are the major states:
  #
  # 1. **Pending** The value is not known and needs to be computed.
  #     @retries.finished? is false.
  #     @value is nil.
  #     @error is nil if no previous attempt has yet been made to
  #         compute the value, or set to the error that resulted from
  #         the most recent attempt.
  #     @expires_at is set to the monotonic time of the end of the
  #         current retry delay, or nil if the next computation attempt
  #         should happen immediately at the next access.
  #     @computing_thread is nil.
  #     @compute_notify is nil.
  #     @backfill_notify is set if currently backfilling, otherwise nil.
  #     From this state, calling #get will start computation (first
  #     waiting on @backfill_notify if present). Calling #expire! will
  #     have no effect.
  #
  # 2. **Computing** One thread has initiated computation. All other
  #     threads will be blocked (waiting on @compute_notify) until the
  #     computing thread finishes.
  #     @retries.finished? is false.
  #     @value and @error are nil.
  #     @expires_at is set to the monotonic time when computing started.
  #     @computing_thread is set to the thread that is computing.
  #     @compute_notify is set.
  #     @backfill_notify is nil.
  #     From this state, calling #get will cause the thread to wait
  #     (on @compute_notify) for the computing thread to complete.
  #     Calling #expire! will have no effect.
  #     When the computing thread finishes, it will transition either
  #     to Finished if the computation was successful or failed with
  #     no more retries, or back to Pending if computation failed with
  #     at least one retry remaining. It might also set @backfill_notify
  #     if other threads are waiting for completion.
  #
  # 3. **Finished** Computation has succeeded, or has failed and no
  #     more retries remain.
  #     @retries.finished? is true.
  #     either @value or @error is set, and the other is nil, depending
  #         on whether the final state is success or failure. (If both
  #         are nil, it is considered a @value of nil.)
  #     @expires_at is set to the monotonic time of expiration, or nil
  #         if there is no expiration.
  #     @computing_thread is nil.
  #     @compute_notify is nil.
  #     @backfill_notify is set if currently backfilling, otherwise nil.
  #     From this state, calling #get will either return the result or
  #     raise the error. If the current time exceeds @expires_at,
  #     however, it will block on @backfill_notify (if present), and
  #     and then transition to Pending first, and proceed from there.
  #     Calling #expire! will block on @backfill_notify (if present)
  #     and then transition to Pending,
  #
  # @backfill_notify can be set in the Pending or Finished states. This
  # happens when threads that had been waiting on the previous
  # computation are still clearing out and returning their results.
  # Backfill must complete before the next computation attempt can be
  # started from the Pending state, or before an expiration can take
  # place from the Finished state. This prevents an "overlap" situation
  # where a thread that had been waiting for a previous computation,
  # isn't able to return the new result before some other thread starts
  # a new computation or expires the value. Note that it is okay for
  # #set! to be called during backfill; the threads still backfilling
  # will simply return the new value.
  #
  # Note: One might ask if it would be simpler to extend the mutex
  # across the entire computation, having it protect the computation
  # itself, instead of the current approach of having explicit compute
  # and backfill states with notifications and having the mutex protect
  # only the state transition. However, this would not have been able
  # to satisfy the requirement that we be able to detect whether a
  # thread asked for the value during another thread's computation,
  # and thus should "share" in that computation's result even if it's
  # a failure (rather than kicking off a retry). Additionally, we
  # consider it dangerous to have the computation block run inside a
  # mutex, because arbitrary code can run there which might result in
  # deadlocks.
  @mutex = ::Thread::Mutex.new
  # The evaluated, cached value, which could be nil.
  @value = nil
  # The last error encountered
  @error = nil
  # If non-nil, this is the CLOCK_MONOTONIC time when the current state
  # expires. If the state is finished, this is the time the current
  # value or error expires (while nil means it never expires). If the
  # state is pending, this is the time the wait period before the next
  # retry expires (and nil means there is no delay.) If the state is
  # computing, this is the time when computing started.
  @expires_at = nil
  # Set to a condition variable during computation. Broadcasts when the
  # computation is complete. Any threads wanting to get the value
  # during computation must wait on this first.
  @compute_notify = nil
  # Set to a condition variable during backfill. Broadcasts when the
  # last backfill thread is complete. Any threads wanting to expire the
  # cache or start a new computation during backfill must wait on this
  # first.
  @backfill_notify = nil
  # The number of threads waiting on backfill. Used to determine
  # whether to activate backfill_notify when a computation completes.
  @backfill_count = 0
  # The thread running the current computation. This is tested against
  # new requests to protect against deadlocks where a thread tries to
  # re-enter from its own computation. This is also tested when a
  # computation completes, to ensure that the computation is still
  # relevant (i.e. if #set! interrupts a computation, this is reset to
  # nil).
  @computing_thread = nil
end

Instance Method Details

#await(*extra_args, transient_errors: nil, max_tries: 1, max_time: nil, delay_epsilon: 0.0001) ⇒ Object

This method calls #get repeatedly until a final result is available or retries have exhausted.

Note: this method spins on #get, although honoring any retry delay. Thus, it is best to call this only if retries are limited or a retry delay has been configured.

Parameters:

  • extra_args (Array)

    extra arguments to pass to the block

  • transient_errors (Array<Class>) (defaults to: nil)

    An array of exception classes that will be treated as transient and will allow await to continue retrying. Exceptions omitted from this list will be treated as fatal errors and abort the call. Default is [StandardError].

  • max_tries (Integer, nil) (defaults to: 1)

    The maximum number of times this will call #get before giving up, or nil for a potentially unlimited number of attempts. Default is 1.

  • max_time (Numeric, nil) (defaults to: nil)

    The maximum time in seconds this will spend before giving up, or nil (the default) for a potentially unlimited timeout.

  • delay_epsilon (Numeric) (defaults to: 0.0001)

    An extra delay in seconds to ensure that retries happen after the retry delay period

Returns:

  • (Object)

    the value

Raises:

  • (Exception)

    if a fatal error happened, or retries have been exhausted.



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/lazy_data/value.rb', line 261

def await(*extra_args, transient_errors: nil, max_tries: 1, max_time: nil, delay_epsilon: 0.0001)
  transient_errors ||= [StandardError]
  transient_errors = Array(transient_errors)
  expiry_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + max_time if max_time
  begin
    get(*extra_args)
  rescue *transient_errors
    # A snapshot of the state. It is possible that another thread has
    # changed this state since we received the error. This is okay because
    # our specification for this method is conservative: whatever we return
    # will have been correct at some point.
    state = internal_state
    # Don't retry unless we're in a state where retries can happen.
    raise if [InternalState::FAILED, InternalState::SUCCESS].include?(state.state)
    if max_tries
      # Handle retry countdown
      max_tries -= 1
      raise unless max_tries.positive?
    end
    # Determine the next delay
    delay = determine_await_retry_delay(state, expiry_time, delay_epsilon)
    # nil means we've exceeded the max time
    raise if delay.nil?
    sleep(delay) if delay.positive?
    retry
  end
end

#expire!true, false

Force this cache to expire immediately, if computation is complete. Any cached value will be cleared, the retry count is reset, and the next access will call the compute block as if it were the first access. Returns true if this took place. Has no effect and returns false if the computation is not yet complete (i.e. if a thread is currently computing, or if the last attempt failed and retries have not yet been exhausted.)

Returns:

  • (true, false)

    whether the cache was expired



321
322
323
324
325
326
327
328
# File 'lib/lazy_data/value.rb', line 321

def expire!
  @mutex.synchronize do
    wait_backfill
    return false unless @retries.finished?
    do_expire
    true
  end
end

#get(*extra_args) ⇒ Object

Returns the value. This will either return the value or raise an error indicating failure to compute the value.

If the value was previously cached, it will return that cached value, otherwise it will either run the computation to try to determine the value, or wait for another thread that is already running the computation. Thus, this method could block.

Any arguments passed will be forwarded to the block if called, but are ignored if a cached value is returned.

Returns:

  • (Object)

    the value

Raises:

  • (Exception)

    if an error happened while computing the value



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/lazy_data/value.rb', line 205

def get(*extra_args)
  @mutex.synchronize do
    # Wait for any backfill to complete, and handle expiration first
    # because it might change the state.
    wait_backfill
    do_expire if should_expire?
    # Main state handling
    if @retries.finished?
      # finished state: return value or error
      return cached_value
    elsif !@compute_notify.nil?
      # computing state: wait for the computing thread to finish then
      # return its result
      wait_compute
      return cached_value
    else
      # pending state
      cur_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
      # waiting for the next retry: return current error
      raise @error if @expires_at && cur_time < @expires_at
      # no delay: compute in the current thread
      enter_compute(cur_time)
      # and continue below
    end
  end

  # Gets here if we just transitioned from pending to compute
  perform_compute(extra_args)
end

#internal_stateInternalState

Returns the current low-level state immediately without waiting for computation.

Returns:



295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/lazy_data/value.rb', line 295

def internal_state
  @mutex.synchronize do
    if @retries.finished?
      if @error
        InternalState.new(InternalState::FAILED, nil, @error, @expires_at)
      else
        InternalState.new(InternalState::SUCCESS, @value, nil, @expires_at)
      end
    elsif @compute_notify.nil?
      InternalState.new(InternalState::PENDING, nil, @error, @expires_at)
    else
      InternalState.new(InternalState::COMPUTING, nil, nil, @expires_at)
    end
  end
end

#set!(value, lifetime: nil) ⇒ Object

Set the cache value explicitly and immediately. If a computation is in progress, it is "detached" and its result will no longer be considered.

Parameters:

  • value (Object)

    the value to set

  • lifetime (Numeric) (defaults to: nil)

    the lifetime until expiration in seconds, or nil (the default) for no expiration.

Returns:

  • (Object)

    the value



339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/lazy_data/value.rb', line 339

def set!(value, lifetime: nil)
  @mutex.synchronize do
    @value = value
    @expires_at = determine_expiry(lifetime)
    @error = nil
    @retries.finish!
    unless @compute_notify.nil?
      enter_backfill
      leave_compute
    end
    value
  end
end