Background
The start
and finish
APIs on the instrumenter are public but cannot be used because the time stack won’t be able to determine the accurate start time. With proper start time detection, we can instrument async tasks, which first can fire the start
event to register the start times and on completion send the finish
event.
ActiveSupport::Notifications.subscribe('foo') do |name, start, finish, id, payload|
puts format(
'[%<name>s] %<id>s [%<start>i -> %<finish>i = %<duration>.3f] %<payload>s',
name: name,
id: id,
start: start.to_i,
finish: finish.to_i,
duration: finish - start,
payload: payload
)
end
instrumenter = ActiveSupport::Notifications.instrumenter
Assuming the following five async tasks queued:
payloads = Hash.new { |hash, key| hash[key] = {} }
1.upto(5) do |i|
payloads[i] = { id: i, start_time: Time.now.to_i }
instrumenter.start('foo', payloads[i])
sleep(1)
end
Simulating completion here:
1.upto(5) do |i|
sleep(1)
instrumenter.finish('foo', payloads[i].merge(end_time: Time.now.to_i))
end
# [foo] dd85abbba742e3982f02 [1642165418 -> 1642165426 = 7.654] {:id=>1, :start_time=>1642165414, :end_time=>1642165426}
# [foo] dd85abbba742e3982f02 [1642165417 -> 1642165427 = 9.659] {:id=>2, :start_time=>1642165415, :end_time=>1642165427}
# [foo] dd85abbba742e3982f02 [1642165416 -> 1642165428 = 11.666] {:id=>3, :start_time=>1642165416, :end_time=>1642165428}
# [foo] dd85abbba742e3982f02 [1642165415 -> 1642165429 = 13.674] {:id=>4, :start_time=>1642165417, :end_time=>1642165429}
# [foo] dd85abbba742e3982f02 [1642165414 -> 1642165430 = 15.683] {:id=>5, :start_time=>1642165418, :end_time=>1642165430}
The start times are inaccurate and it is not possible to detect the correct time unless we have timings mapped to some unique identifier for these tasks.
Proposal
Currently, the start times are pushed into a stack, but we can use Concurrent::Map
with task identifier as keys and start time as values.
The instrumenter injects the task id when calling instrument
. The direct calls to start
and finish
must have this in the payload.
diff --git a/activesupport/lib/active_support/notifications/instrumenter.rb b/activesupport/lib/active_support/notifications/instrumenter.rb
index c69e8cd8d4..2dd41dafac 100644
--- a/activesupport/lib/active_support/notifications/instrumenter.rb
+++ b/activesupport/lib/active_support/notifications/instrumenter.rb
@@ -18,6 +18,7 @@ def initialize(notifier)
# notifier. Notice that events get sent even if an error occurs in the
# passed-in block.
def instrument(name, payload = {})
+ payload[:instrumented_task_id] = unique_id
# some of the listeners might have state
listeners_state = start name, payload
begin
The notifier maintains the time map:
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb
index 0759d3a086..8b90fbc65c 100644
--- a/activesupport/lib/active_support/notifications/fanout.rb
+++ b/activesupport/lib/active_support/notifications/fanout.rb
@@ -226,13 +226,13 @@ def publish(name, *args)
end
def start(name, id, payload)
- timestack = IsolatedExecutionState[:_timestack] ||= []
- timestack.push Time.now
+ start_times = IsolatedExecutionState[:_timestack] ||= Concurrent::Map.new
+ start_times[payload[:instrumented_task_id]] = Time.now
end
def finish(name, id, payload)
- timestack = IsolatedExecutionState[:_timestack]
- started = timestack.pop
+ start_times = IsolatedExecutionState[:_timestack]
+ started = start_times.delete(payload[:instrumented_task_id])
@delegate.call(name, started, Time.now, id, payload)
end
end
@@ -243,13 +243,13 @@ def publish(name, *args)
end
def start(name, id, payload)
- timestack = IsolatedExecutionState[:_timestack_monotonic] ||= []
- timestack.push Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ start_times = IsolatedExecutionState[:_timestack_monotonic] ||= Concurrent::Map.new
+ start_times[payload[:instrumented_task_id]] = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
def finish(name, id, payload)
- timestack = IsolatedExecutionState[:_timestack_monotonic]
- started = timestack.pop
+ start_times = IsolatedExecutionState[:_timestack_monotonic]
+ started = start_times.delete(payload[:instrumented_task_id])
@delegate.call(name, started, Process.clock_gettime(Process::CLOCK_MONOTONIC), id, payload)
end
end