digraph G {
0 [labelType="html" label="<br><b>TakeOrderedAndProject</b><br><br>"];
subgraph cluster1 {
isCluster="true";
label="WholeStageCodegen (2)\n \nduration: total (min, med, max (stageId: taskId))\n528 ms (1 ms, 3 ms, 6 ms (stage 83.0: task 2555))";
2 [labelType="html" label="<b>HashAggregate</b><br><br>time in aggregation build total (min, med, max (stageId: taskId))<br>310 ms (0 ms, 2 ms, 3 ms (stage 83.0: task 2592))<br>peak memory total (min, med, max (stageId: taskId))<br>12.5 GiB (64.3 MiB, 64.3 MiB, 64.3 MiB (stage 83.0: task 2554))<br>number of output rows: 330,975<br>avg hash probe bucket list iters (min, med, max (stageId: taskId)):<br>(1, 1, 1 (stage 83.0: task 2554))"];
}
3 [labelType="html" label="<b>Exchange</b><br><br>shuffle records written: 331,002<br>shuffle write time total (min, med, max (stageId: taskId))<br>488 ms (13 ms, 18 ms, 20 ms (stage 82.0: task 2527))<br>records read: 331,002<br>local bytes read total (min, med, max (stageId: taskId))<br>2.2 MiB (8.9 KiB, 12.2 KiB, 13.5 KiB (stage 83.0: task 2725))<br>fetch wait time total (min, med, max (stageId: taskId))<br>10 ms (0 ms, 0 ms, 1 ms (stage 83.0: task 2592))<br>remote bytes read total (min, med, max (stageId: taskId))<br>2.2 MiB (8.8 KiB, 10.0 KiB, 13.6 KiB (stage 83.0: task 2592))<br>local blocks read: 2,824<br>remote blocks read: 2,776<br>data size total (min, med, max (stageId: taskId))<br>7.6 MiB (233.1 KiB, 279.2 KiB, 294.1 KiB (stage 82.0: task 2533))<br>shuffle bytes written total (min, med, max (stageId: taskId))<br>4.4 MiB (136.4 KiB, 159.4 KiB, 169.5 KiB (stage 82.0: task 2530))"];
subgraph cluster4 {
isCluster="true";
label="WholeStageCodegen (1)\n \nduration: total (min, med, max (stageId: taskId))\n2.4 m (3.6 s, 5.7 s, 6.4 s (stage 82.0: task 2529))";
5 [labelType="html" label="<b>HashAggregate</b><br><br>time in aggregation build total (min, med, max (stageId: taskId))<br>2.4 m (3.6 s, 5.7 s, 6.3 s (stage 82.0: task 2529))<br>peak memory total (min, med, max (stageId: taskId))<br>7.0 MiB (256.0 KiB, 256.0 KiB, 256.0 KiB (stage 82.0: task 2526))<br>number of output rows: 331,002"];
6 [labelType="html" label="<br><b>Project</b><br><br>"];
7 [labelType="html" label="<b>Scan ExistingRDD</b><br><br>number of output rows: 33,832,162"];
}
2->0;
3->2;
5->3;
6->5;
7->6;
}
8
TakeOrderedAndProject(limit=11, orderBy=[num_ratings#369L DESC NULLS LAST], output=[userId#379,num_ratings#380,user_type#372])
HashAggregate(keys=[userId#0L], functions=[count(1)])
WholeStageCodegen (2)
Exchange hashpartitioning(userId#0L, 200), true, [id=#504]
HashAggregate(keys=[userId#0L], functions=[partial_count(1)])
Project [userId#0L]
Scan ExistingRDD[userId#0L,movieId#1L,rating#2,timestamp_str#3L]
WholeStageCodegen (1)
== Parsed Logical Plan ==
GlobalLimit 11
+- LocalLimit 11
+- Project [cast(userId#0L as string) AS userId#379, cast(num_ratings#369L as string) AS num_ratings#380, cast(user_type#372 as string) AS user_type#381]
+- Sort [num_ratings#369L DESC NULLS LAST], true
+- Project [userId#0L, num_ratings#369L, CASE WHEN (num_ratings#369L > cast(50 as bigint)) THEN Frequent Raters ELSE Infrequent Raters END AS user_type#372]
+- Aggregate [userId#0L], [userId#0L, count(1) AS num_ratings#369L]
+- Sort [date#39 ASC NULLS FIRST], true
+- Project [userId#0L, movieId#1L, rating#2, timestamp_str#3L, from_unixtime(timestamp_str#3L, yyyy-MM-dd, Some(GMT)) AS date#39]
+- LogicalRDD [userId#0L, movieId#1L, rating#2, timestamp_str#3L], false
== Analyzed Logical Plan ==
userId: string, num_ratings: string, user_type: string
GlobalLimit 11
+- LocalLimit 11
+- Project [cast(userId#0L as string) AS userId#379, cast(num_ratings#369L as string) AS num_ratings#380, cast(user_type#372 as string) AS user_type#381]
+- Sort [num_ratings#369L DESC NULLS LAST], true
+- Project [userId#0L, num_ratings#369L, CASE WHEN (num_ratings#369L > cast(50 as bigint)) THEN Frequent Raters ELSE Infrequent Raters END AS user_type#372]
+- Aggregate [userId#0L], [userId#0L, count(1) AS num_ratings#369L]
+- Sort [date#39 ASC NULLS FIRST], true
+- Project [userId#0L, movieId#1L, rating#2, timestamp_str#3L, from_unixtime(timestamp_str#3L, yyyy-MM-dd, Some(GMT)) AS date#39]
+- LogicalRDD [userId#0L, movieId#1L, rating#2, timestamp_str#3L], false
== Optimized Logical Plan ==
GlobalLimit 11
+- LocalLimit 11
+- Project [cast(userId#0L as string) AS userId#379, cast(num_ratings#369L as string) AS num_ratings#380, user_type#372]
+- Sort [num_ratings#369L DESC NULLS LAST], true
+- Aggregate [userId#0L], [userId#0L, count(1) AS num_ratings#369L, CASE WHEN (count(1) > 50) THEN Frequent Raters ELSE Infrequent Raters END AS user_type#372]
+- Project [userId#0L]
+- LogicalRDD [userId#0L, movieId#1L, rating#2, timestamp_str#3L], false
== Physical Plan ==
TakeOrderedAndProject(limit=11, orderBy=[num_ratings#369L DESC NULLS LAST], output=[userId#379,num_ratings#380,user_type#372])
+- *(2) HashAggregate(keys=[userId#0L], functions=[count(1)], output=[userId#0L, num_ratings#369L, user_type#372])
+- Exchange hashpartitioning(userId#0L, 200), true, [id=#504]
+- *(1) HashAggregate(keys=[userId#0L], functions=[partial_count(1)], output=[userId#0L, count#386L])
+- *(1) Project [userId#0L]
+- *(1) Scan ExistingRDD[userId#0L,movieId#1L,rating#2,timestamp_str#3L]