RDD¶
-
class
pysparkling.
RDD
(partitions, ctx)[source]¶ RDD
In Spark’s original form, RDDs are Resilient, Distributed Datasets. This class reimplements the same interface with the goal of being fast on small data at the cost of being resilient and distributed.
Parameters: -
compute
(split, task_context)[source]¶ interface to extend behavior for specific cases
Parameters: split (Partition) – a partition
-
aggregate
(zeroValue, seqOp, combOp)[source]¶ aggregate
[distributed]
Parameters: - zeroValue – The initial value to an aggregation, for example
0
or0.0
for aggregatingint
s andfloat
s, but any Python object is possible. - seqOp – A reference to a function that combines the current state with a new value. In the first iteration, the current state is zeroValue.
- combOp – A reference to a function that combines outputs of seqOp. In the first iteration, the current state is zeroValue.
Returns: Output of
combOp
operations.Example:
>>> from pysparkling import Context >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) >>> Context().parallelize( ... [1, 2, 3, 4], 2 ... ).aggregate((0, 0), seqOp, combOp) (10, 4)
- zeroValue – The initial value to an aggregation, for example
-
aggregateByKey
(zeroValue, seqFunc, combFunc, numPartitions=None)[source]¶ aggregate by key
Parameters: - zeroValue – The initial value to an aggregation, for example
0
or0.0
for aggregatingint
s andfloat
s, but any Python object is possible. - seqFunc – A reference to a function that combines the current state with a new value. In the first iteration, the current state is zeroValue.
- combFunc – A reference to a function that combines outputs of seqFunc. In the first iteration, the current state is zeroValue.
- numPartitions (int) – Not used.
Returns: An RDD with the output of
combOp
operations.Return type: Example:
>>> from pysparkling import Context >>> seqOp = (lambda x, y: x + y) >>> combOp = (lambda x, y: x + y) >>> r = Context().parallelize( ... [('a', 1), ('b', 2), ('a', 3), ('c', 4)] ... ).aggregateByKey(0, seqOp, combOp).collectAsMap() >>> (r['a'], r['b']) (4, 2)
- zeroValue – The initial value to an aggregation, for example
-
cache
()[source]¶ Once a partition is computed, cache the result.
Alias for
persist()
.Example:
>>> import pysparkling >>> >>> n_exec = 0 >>> >>> def _map(e): ... global n_exec ... n_exec += 1 ... return e*e >>> >>> sc = pysparkling.Context() >>> my_rdd = sc.parallelize([1, 2, 3, 4], 2).map(_map).cache() >>> >>> # no exec until here >>> n_exec 0 >>> # to get first element, compute the first partition >>> my_rdd.first() 1 >>> n_exec 2 >>> # now compute the rest >>> my_rdd.collect() [1, 4, 9, 16] >>> n_exec 4 >>> # now _map() was executed on all partitions and should >>> # not be executed again >>> my_rdd.collect() [1, 4, 9, 16] >>> n_exec 4
-
cartesian
(other)[source]¶ cartesian product of this RDD with
other
Parameters: other (RDD) – Another RDD. Return type: RDD Note
This is currently implemented as a local operation requiring all data to be pulled on one machine.
Example:
>>> from pysparkling import Context >>> rdd = Context().parallelize([1, 2]) >>> sorted(rdd.cartesian(rdd).collect()) [(1, 1), (1, 2), (2, 1), (2, 2)]
-
coalesce
(numPartitions, shuffle=False)[source]¶ coalesce
Parameters: - numPartitions (int) – Number of partitions in the resulting RDD.
- shuffle – (optional) Not used.
Return type: Note
This is currently implemented as a local operation requiring all data to be pulled on one machine.
Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3], 2).coalesce(1).getNumPartitions() 1 >>> from pysparkling import Context >>> Context().parallelize([1, 2, 3], 2).coalesce(4).getNumPartitions() 2 >>> from pysparkling import Context >>> rdd = Context().parallelize([1, 2, 3, 4, 5, 6, 7, 8], 5) >>> rdd.foreachPartition(lambda x: print(list(x))) [1] [2, 3] [4] [5, 6] [7, 8] >>> rdd.coalesce(4).foreachPartition(lambda x: print(list(x))) [1, 2, 3] [4] [5, 6] [7, 8] >>> rdd.coalesce(4).coalesce(3).foreachPartition(lambda x: print(list(x))) [1, 2, 3, 4] [5, 6] [7, 8] >>> rdd.coalesce(3).foreachPartition(lambda x: print(list(x))) [1, 2, 3] [4, 5, 6] [7, 8]
-
cogroup
(other, numPartitions=None)[source]¶ Groups keys from both RDDs together. Values are nested iterators.
Parameters: Return type: Example:
>>> from pysparkling import Context >>> c = Context() >>> a = c.parallelize([('house', 1), ('tree', 2)]) >>> b = c.parallelize([('house', 3)]) >>> >>> [(k, sorted(list([list(vv) for vv in v]))) ... for k, v in sorted(a.cogroup(b).collect()) ... ] [('house', [[1], [3]]), ('tree', [[], [2]])]
-
collect
()[source]¶ returns the entire dataset as a list
Return type: list Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3]).collect() [1, 2, 3]
-
collectAsMap
()[source]¶ returns a dictionary for a pair dataset
Return type: dict Example:
>>> from pysparkling import Context >>> d = Context().parallelize([('a', 1), ('b', 2)]).collectAsMap() >>> (d['a'], d['b']) (1, 2)
-
count
()[source]¶ number of entries in this dataset
Return type: int Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3], 2).count() 3
-
countByKey
()[source]¶ returns a
dict
containing the count for every keyReturn type: dict Example:
>>> from pysparkling import Context >>> Context().parallelize( ... [('a', 1), ('b', 2), ('b', 2)] ... ).countByKey()['b'] 2
-
countByValue
()[source]¶ returns a
dict
containing the count for every valueReturn type: dict Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 2, 4, 1]).countByValue()[2] 2
-
distinct
(numPartitions=None)[source]¶ returns only distinct elements
Parameters: numPartitions (int) – Number of partitions in the resulting RDD. Return type: RDD Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 2, 4, 1]).distinct().count() 3
-
filter
(f)[source]¶ filter elements
Parameters: f – a function that decides whether to keep an element Return type: RDD Example:
>>> from pysparkling import Context >>> Context().parallelize( ... [1, 2, 2, 4, 1, 3, 5, 9], 3, ... ).filter(lambda x: x % 2 == 0).collect() [2, 2, 4]
-
first
()[source]¶ returns the first element in the dataset
Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 2, 4, 1, 3, 5, 9], 3).first() 1
Works also with empty partitions:
>>> from pysparkling import Context >>> Context().parallelize([1, 2], 20).first() 1
-
flatMap
(f, preservesPartitioning=True)[source]¶ map followed by flatten
Parameters: - f – The map function.
- preservesPartitioning – (optional) Preserve the partitioning of the original RDD. Default True.
Return type: Example:
>>> from pysparkling import Context >>> Context().parallelize(['hello', 'world']).flatMap( ... lambda x: [ord(ch) for ch in x] ... ).collect() [104, 101, 108, 108, 111, 119, 111, 114, 108, 100]
-
flatMapValues
(f)[source]¶ map operation on values in a (key, value) pair followed by a flatten
Parameters: f – The map function. Return type: RDD Example:
>>> from pysparkling import Context >>> Context().parallelize([(1, 'hi'), (2, 'world')]).flatMapValues( ... lambda x: [ord(ch) for ch in x] ... ).collect() [(1, 104), (1, 105), (2, 119), (2, 111), (2, 114), (2, 108), (2, 100)]
-
fold
(zeroValue, op)[source]¶ fold
Parameters: - zeroValue – The inital value, for example
0
or0.0
. - op – The reduce operation.
Returns: The folded (or aggregated) value.
Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([4, 7, 2]) >>> my_rdd.fold(0, lambda a, b: a+b) 13
- zeroValue – The inital value, for example
-
foldByKey
(zeroValue, op)[source]¶ Fold (or aggregate) value by key.
Parameters: - zeroValue – The inital value, for example
0
or0.0
. - op – The reduce operation.
Return type: Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([('a', 4), ('b', 7), ('a', 2)]) >>> my_rdd.foldByKey(0, lambda a, b: a+b).collectAsMap()['a'] 6
- zeroValue – The inital value, for example
-
foreach
(f)[source]¶ applies
f
to every elementIt does not return a new RDD like
map()
.Parameters: f – Apply a function to every element. Return type: None Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([1, 2, 3]) >>> a = [] >>> my_rdd.foreach(lambda x: a.append(x)) >>> len(a) 3
-
foreachPartition
(f)[source]¶ applies
f
to every partitionIt does not return a new RDD like
mapPartitions()
.Parameters: f – Apply a function to every partition. Return type: None
-
fullOuterJoin
(other, numPartitions=None)[source]¶ returns the full outer join of two RDDs
The output contains all keys from both input RDDs, with missing keys replaced with
None
.Parameters: Return type: Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> sc = Context() >>> rdd1 = sc.parallelize([('a', 0), ('b', 1)]) >>> rdd2 = sc.parallelize([('b', 2), ('c', 3)]) >>> sorted( ... rdd1.fullOuterJoin(rdd2).collect() ... ) [('a', (0, None)), ('b', (1, 2)), ('c', (None, 3))]
-
groupBy
(f, numPartitions=None)[source]¶ group by f
Parameters: - f – Function returning a key given an element of the dataset.
- numPartitions (int) – Number of partitions in the resulting RDD.
Return type: Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([4, 7, 2]) >>> my_rdd.groupBy(lambda x: x % 2).mapValues(sorted).collect() [(0, [2, 4]), (1, [7])]
-
groupByKey
(numPartitions=None)[source]¶ group by key
Parameters: numPartitions (int) – Number of partitions in the resulting RDD. Return type: RDD Note
Creating the new RDD is currently implemented as a local operation.
-
histogram
(buckets)[source]¶ histogram
Parameters: buckets – A list of bucket boundaries or an int for the number of buckets. Returns: A tuple (bucket_boundaries, histogram_values) where bucket_boundaries is a list of length n+1 boundaries and histogram_values is a list of length n with the values of each bucket. Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([0, 4, 7, 4, 10]) >>> b, h = my_rdd.histogram(10) >>> h [1, 0, 0, 0, 2, 0, 0, 1, 0, 0, 1]
-
intersection
(other)[source]¶ intersection of this and other RDD
Parameters: other (RDD) – The other dataset to do the intersection with. Return type: RDD Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> rdd1 = Context().parallelize([0, 4, 7, 4, 10]) >>> rdd2 = Context().parallelize([3, 4, 7, 4, 5]) >>> rdd1.intersection(rdd2).collect() [4, 7]
-
join
(other, numPartitions=None)[source]¶ join
Parameters: Return type: Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> rdd1 = Context().parallelize([(0, 1), (1, 1)]) >>> rdd2 = Context().parallelize([(2, 1), (1, 3)]) >>> rdd1.join(rdd2).collect() [(1, (1, 3))]
-
keyBy
(f)[source]¶ key by f
Parameters: f – Function that returns a key from a dataset element. Return type: RDD Example:
>>> from pysparkling import Context >>> rdd = Context().parallelize([0, 4, 7, 4, 10]) >>> rdd.keyBy(lambda x: x % 2).collect() [(0, 0), (0, 4), (1, 7), (0, 4), (0, 10)]
-
keys
()[source]¶ keys of a pair dataset
Return type: RDD Example:
>>> from pysparkling import Context >>> Context().parallelize([(0, 1), (1, 1)]).keys().collect() [0, 1]
-
leftOuterJoin
(other, numPartitions=None)[source]¶ left outer join
Parameters: Return type: Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> rdd1 = Context().parallelize([(0, 1), (1, 1)]) >>> rdd2 = Context().parallelize([(2, 1), (1, 3)]) >>> rdd1.leftOuterJoin(rdd2).collect() [(0, (1, None)), (1, (1, 3))]
-
lookup
(key)[source]¶ Return all the (key, value) pairs where the given key matches.
Parameters: key – The key to lookup. Return type: list Example:
>>> from pysparkling import Context >>> Context().parallelize([(0, 1), (1, 1), (1, 3)]).lookup(1) [1, 3]
-
map
(f)[source]¶ map
Parameters: f – map function for elements Return type: RDD Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3]).map(lambda x: x+1).collect() [2, 3, 4]
-
mapPartitions
(f, preservesPartitioning=False)[source]¶ map partitions
Parameters: f – map function for partitions Return type: RDD Example:
>>> from pysparkling import Context >>> rdd = Context().parallelize([1, 2, 3, 4], 2) >>> def f(iterator): ... yield sum(iterator) >>> rdd.mapPartitions(f).collect() [3, 7]
-
mapPartitionsWithIndex
(f, preservesPartitioning=False)[source]¶ map partitions with index
Parameters: f – map function for (index, partition) Return type: RDD Example:
>>> from pysparkling import Context >>> rdd = Context().parallelize([9, 8, 7, 6, 5, 4], 3) >>> def f(splitIndex, iterator): ... yield splitIndex >>> rdd.mapPartitionsWithIndex(f).sum() 3
-
mapValues
(f)[source]¶ map values in a pair dataset
Parameters: f – map function for values Return type: RDD
-
max
()[source]¶ returns the maximum element
Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3, 4, 3, 2], 2).max() == 4 True
-
mean
()[source]¶ returns the mean of this dataset
Example:
>>> from pysparkling import Context >>> Context().parallelize([0, 4, 7, 4, 10]).mean() 5.0
-
partitionBy
(numPartitions, partitionFunc=None)[source]¶ Return a partitioned copy of this key-value RDD.
Parameters: - numPartitions (int) – Number of partitions.
- partitionFunc (function) – Partition function.
Return type: Example where even numbers get assigned to partition 0 and odd numbers to partition 1:
>>> import pysparkling >>> sc = pysparkling.Context() >>> rdd = sc.parallelize([1, 3, 2, 7, 8, 5], 1) >>> keyvalue_rdd = rdd.map(lambda x: (x, x)) >>> keyvalue_rdd.partitionBy(2).keys().collect() [2, 8, 1, 3, 7, 5]
-
persist
(storageLevel=None)[source]¶ Cache the results of computed partitions.
Parameters: storageLevel – Not used.
-
pipe
(command, env=None)[source]¶ Run a command with the elements in the dataset as argument.
Parameters: - command – Command line command to run.
- env (dict) – environment variables
Return type: Warning
Unsafe for untrusted data.
Example:
>>> from pysparkling import Context >>> piped = Context().parallelize(['0', 'hello', 'world']).pipe('echo') >>> b'hello\n' in piped.collect() True
-
randomSplit
(weights, seed=None)[source]¶ Split the RDD into a few RDDs according to the given weights.
Parameters: Returns: a list of RDDs
Return type: Note
Creating the new RDDs is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> rdd = Context().parallelize(range(500)) >>> rdd1, rdd2 = rdd.randomSplit([2, 3], seed=42) >>> (rdd1.count(), rdd2.count()) (199, 301)
-
reduce
(f)[source]¶ reduce
Parameters: f – A commutative and associative binary operator. Example:
>>> from pysparkling import Context >>> Context().parallelize([0, 4, 7, 4, 10], 2).reduce(lambda a, b: a+b) 25 >>> Context().parallelize([0, 4, 7, 4, 10], 10).reduce(lambda a, b: a+b) 25 >>> Context().parallelize([0], 10).reduce(lambda a, b: a+b) 0 >>> Context().parallelize([], 10).reduce(lambda a, b: a+b) Traceback (most recent call last): ... ValueError: Can not reduce() empty RDD
-
reduceByKey
(f, numPartitions=None)[source]¶ reduce by key
Parameters: - f – A commutative and associative binary operator.
- numPartitions (int) – Number of partitions in the resulting RDD.
Return type: Note
This operation includes a
groupByKey()
which is a local operation.Example:
>>> from pysparkling import Context >>> rdd = Context().parallelize([(0, 1), (1, 1), (1, 3)]) >>> rdd.reduceByKey(lambda a, b: a+b).collect() [(0, 1), (1, 4)]
-
repartition
(numPartitions)[source]¶ repartition
Parameters: numPartitions (int) – Number of partitions in new RDD. Return type: RDD Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3], 2).repartition(1).getNumPartitions() 1 >>> from pysparkling import Context >>> Context().parallelize([1, 2, 3], 2).repartition(4).getNumPartitions() 4
-
repartitionAndSortWithinPartitions
(numPartitions=None, partitionFunc=None, ascending=True, keyfunc=None)[source]¶ Repartition and sort within each partition.
Parameters: - numPartitions (int) – Number of partitions in new RDD.
- partitionFunc – function that partitions
- ascending – Sort order.
- keyfunc – Returns the value that will be sorted.
Return type: Example where even numbers are assigned to partition 0 and odd numbers to partition 1 and then the partitions are sorted individually:
>>> import pysparkling >>> sc = pysparkling.Context() >>> rdd = sc.parallelize([1, 3, 2, 7, 8, 5], 1) >>> kv_rdd = rdd.map(lambda x: (x, x)) >>> processed = kv_rdd.repartitionAndSortWithinPartitions(2) >>> processed.keys().collect() [2, 8, 1, 3, 5, 7]
-
rightOuterJoin
(other, numPartitions=None)[source]¶ right outer join
Parameters: Return type: Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> rdd1 = sc.parallelize([(0, 1), (1, 1)]) >>> rdd2 = sc.parallelize([(2, 1), (1, 3)]) >>> sorted(rdd1.rightOuterJoin(rdd2).collect()) [(1, (1, 3)), (2, (None, 1))]
-
sample
(withReplacement, fraction, seed=None)[source]¶ randomly sample
Parameters: Return type: Sampling without replacement uses Bernoulli sampling and
fraction
is the probability that an element is sampled. Sampling with replacement uses Poisson sampling wherefraction
is the expectation.Example:
>>> from pysparkling import Context >>> rdd = Context().parallelize(range(1000)) >>> sampled = rdd.sample(False, 0.1, seed=5).collect() >>> len(sampled) 115 >>> sampled_with_replacement = rdd.sample(True, 5.0, seed=5).collect() >>> len(sampled_with_replacement) in (5067, 5111) # w/o, w/ numpy True
-
sampleByKey
(withReplacement, fractions, seed=None)[source]¶ randomly sample by key
Parameters: Return type: Sampling without replacement uses Bernoulli sampling and
fraction
is the probability that an element is sampled. Sampling with replacement uses Poisson sampling wherefraction
is the expectation.Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> fractions = {"a": 0.2, "b": 0.1} >>> rdd = (sc ... .parallelize(fractions.keys()) ... .cartesian(sc.parallelize(range(0, 1000)))) >>> sample = (rdd ... .sampleByKey(False, fractions, 2) ... .groupByKey().collectAsMap()) >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150 True >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0 True >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0 True
-
sampleStdev
()[source]¶ sample standard deviation
Return type: float Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3]).sampleStdev() 1.0
-
sampleVariance
()[source]¶ sample variance
Return type: float Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3]).sampleVariance() 1.0
-
saveAsPickleFile
(path, batchSize=10)[source]¶ save as pickle file
Returns: self
Return type: RDD Warning
The output of this function is incompatible with the PySpark output as there is no pure Python way to write Sequence files.
Example:
>>> from pysparkling import Context >>> from tempfile import NamedTemporaryFile >>> tmpFile = NamedTemporaryFile(delete=True) >>> tmpFile.close() >>> d = ['hello', 'world', 1, 2] >>> rdd = Context().parallelize(d).saveAsPickleFile(tmpFile.name) >>> 'hello' in Context().pickleFile(tmpFile.name).collect() True
-
saveAsTextFile
(path, compressionCodecClass=None)[source]¶ save as text file
If the RDD has many partitions, the contents will be stored directly in the given path. If the RDD has more partitions, the data of the partitions are stored in individual files under
path/part-00000
and so on and once all partitions are written, the filepath/_SUCCESS
is written last.Parameters: - path – Destination of the text file.
- compressionCodecClass – Not used.
Returns: self
Return type:
-
sortBy
(keyfunc, ascending=True, numPartitions=None)[source]¶ sort by keyfunc
Parameters: Return type: Note
Sorting is currently implemented as a local operation.
Examples:
>>> from pysparkling import Context >>> rdd = Context().parallelize([5, 1, 2, 3]) >>> rdd.sortBy(lambda x: x).collect() [1, 2, 3, 5]
>>> from pysparkling import Context >>> rdd = Context().parallelize([1, 5, 2, 3]) >>> rdd.sortBy(lambda x: x, ascending=False).collect() [5, 3, 2, 1]
-
sortByKey
(ascending=True, numPartitions=None, keyfunc=<operator.itemgetter object>)[source]¶ sort by key
Parameters: Return type: Note
Sorting is currently implemented as a local operation.
Examples:
>>> from pysparkling import Context >>> rdd = Context().parallelize( ... [(5, 'a'), (1, 'b'), (2, 'c'), (3, 'd')] ... ) >>> rdd.sortByKey().collect()[0][1] == 'b' True
>>> from pysparkling import Context >>> rdd = Context().parallelize( ... [(1, 'b'), (5, 'a'), (2, 'c'), (3, 'd')] ... ) >>> rdd.sortByKey(ascending=False).collect()[0][1] == 'a' True
-
stats
()[source]¶ stats
Return type: StatCounter Example:
>>> from pysparkling import Context >>> d = [1, 4, 9, 16, 25, 36] >>> s = Context().parallelize(d, 3).stats() >>> sum(d)/len(d) == s.mean() True
-
stdev
()[source]¶ standard deviation
Return type: float Example:
>>> from pysparkling import Context >>> Context().parallelize([1.5, 2.5]).stdev() 0.5
-
subtract
(other, numPartitions=None)[source]¶ subtract
Parameters: Return type: Example:
>>> from pysparkling import Context >>> rdd1 = Context().parallelize([(0, 1), (1, 1)]) >>> rdd2 = Context().parallelize([(1, 1), (1, 3)]) >>> rdd1.subtract(rdd2).collect() [(0, 1)]
-
sum
()[source]¶ sum of all the elements
Return type: float Example:
>>> from pysparkling import Context >>> Context().parallelize([0, 4, 7, 4, 10]).sum() 25
-
take
(n)[source]¶ Take n elements and return them in a list.
Only evaluates the partitions that are necessary to return n elements.
Parameters: n (int) – Number of elements to return. Return type: list Example:
>>> from pysparkling import Context >>> Context().parallelize([4, 7, 2]).take(2) [4, 7]
Another example where only the first two partitions only are computed (check the debug logs):
>>> from pysparkling import Context >>> Context().parallelize([4, 7, 2], 3).take(2) [4, 7]
-
takeSample
(n)[source]¶ take sample
Assumes samples are evenly distributed between partitions. Only evaluates the partitions that are necessary to return n elements.
Parameters: n (int) – The number of elements to sample. Return type: list Example:
>>> from pysparkling import Context >>> Context().parallelize([4, 7, 2]).takeSample(1)[0] in [4, 7, 2] True
Another example where only one partition is computed (check the debug logs):
>>> from pysparkling import Context >>> d = [4, 9, 7, 3, 2, 5] >>> Context().parallelize(d, 3).takeSample(1)[0] in d True
-
toLocalIterator
()[source]¶ Returns an iterator over the dataset.
Example:
>>> from pysparkling import Context >>> sum(Context().parallelize([4, 9, 7, 3, 2, 5], 3).toLocalIterator()) 30
-
top
(num, key=None)[source]¶ Top N elements in descending order.
Parameters: - num (int) – number of elements
- key – optional key function
Return type: Example:
>>> from pysparkling import Context >>> r = Context().parallelize([4, 9, 7, 3, 2, 5], 3) >>> r.top(2) [9, 7]
-
union
(other)[source]¶ union
Parameters: other (RDD) – The other RDD for the union. Return type: RDD Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([4, 9, 7, 3, 2, 5], 3) >>> my_rdd.union(my_rdd).count() 12
-
variance
()[source]¶ The variance of the dataset.
Return type: float Example:
>>> from pysparkling import Context >>> Context().parallelize([1.5, 2.5]).variance() 0.25
-
zip
(other)[source]¶ zip
Parameters: other (RDD) – Other dataset to zip with. Return type: RDD Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([4, 9, 7, 3, 2, 5], 3) >>> my_rdd.zip(my_rdd).collect() [(4, 4), (9, 9), (7, 7), (3, 3), (2, 2), (5, 5)]
-
zipWithIndex
()[source]¶ Returns pairs of an original element and its index.
Return type: RDD Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([4, 9, 7, 3, 2, 5], 3) >>> my_rdd.zipWithIndex().collect() [(4, 0), (9, 1), (7, 2), (3, 3), (2, 4), (5, 5)]
-
zipWithUniqueId
()[source]¶ Zip every entry with a unique index.
This is a fast operation.
Return type: RDD Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([423, 234, 986, 5, 345], 3) >>> my_rdd.zipWithUniqueId().collect() [(423, 0), (234, 1), (986, 4), (5, 2), (345, 5)]
-