函数式编程与分布式(CRDTs)

(多)线程/协程并不能让你更容易/优雅地解决并行/分布式问题 –除非你是库的作者。

线程操作/异常处理/日志/并发/竞态/顺序/消息同步/任务分配/网络延迟/宕机/错误恢复/重复计算/缓存…这些问题都不是只有多线程或者协程能解决的,对于大多数人来讲,合理的做法应该是构造正确的抽象,上述问题应当由框架解决,而不是自己做(除非你是一个库的作者)。

考虑如下经典问题:

实时统计某通信软件中聊天消息的单词词频(不可使用已有的分布式计算框架 eg, Spark, Flink, Map-Reduce, Storm…)

问题分析:

  1. 单机处理需要的时间太长,最好能够通过分布式计算解决
  2. 对于这个问题来讲,任务拆分其实不难
  3. 难点在于如何实时合并各个节点统计的结果

什么是CAP定理?

In theoretical computer science, the CAP theorem, also named Brewer’s theorem after computer scientist Eric Brewer, states that it is impossible for a distributed data store to simultaneously provide more than two out of the following three guarantees:

  • Consistency: Every read receives the most recent write or an error
  • Availability: Every request receives a (non-error) response, without the guarantee that it contains the most recent write
  • Partition tolerance: The system continues to operate despite an arbitrary number of messages being dropped (or delayed) by the network between nodes

我全都要

One workaround for the CAP theorem, which I believe was pioneered at Amazon but which is now widespread, is to go for “eventual consistency”: maintain an AP system, but ensure that every update to a given datum is eventually propagated to every node that needs to know about it.

什么是CRDTs?

In distributed computing, a conflict-free replicated data type (CRDT) is a data structure which can be replicated across multiple computers in a network, where the replicas can be updated independently and concurrently without coordination between the replicas, and where it is always mathematically possible to resolve inconsistencies that might come up.

Go to Math

群(group): G为非空集合,如果在G上定义的二元运算 *,满足

  • 封闭性(Closure):对于任意a,b∈G,有a*b∈G
  • 结合律(Associativity):对于任意a,b,c∈G,有(ab)c=a(bc)
  • 幺元 (Identity):存在幺元e,使得对于任意a∈G,ea=ae=a
  • 逆元:对于任意a∈G,存在逆元a^-1,使得a^-1a=aa^-1=e

则称(G,*)是群,简称G是群。

如果仅满足封闭性和结合律,则称G是一个半群(Semigroup);如果仅满足封闭性、结合律并且有幺元,则称G是一个含幺半群(Monoid)。

???

Show me the code, now!

trait SemiGroup[T]:
  extension (x: T) def combine (y: T): T

trait Monoid[T] extends SemiGroup[T]:
  def unit: T

object Monoid:
  def apply[T](using m: Monoid[T]) = m

Hello, Monoid

given Monoid[Int]:
  extension (x: Int) def combine (y: Int): Int = x + y
  def unit: Int = 0

1 combine 2

def combineAll[T: Monoid](xs: List[T]): T =
  xs.foldLeft(Monoid[T].unit)(_.combine(_))

combineAll(List(1, 2, 3))

Let’s get started!

type Result = Map[String,Int]


given Monoid[Result]:
  extension (x: Result) def combine (y: Result): Result = 
      (x.toList ::: y.toList).groupBy(_._1).map {
        case (k, v) => (k, (v.map(_._2).reduce(_ combine _)))
      }.toMap
  def unit: Result = Map.empty
val left = Map("hello" -> 1, "monoid" -> 2)
val right = Map("hello" -> 1, "scala" -> 3)
scala> left combine right
val res1: Result = HashMap(monoid -> 2, scala -> 3, hello -> 2)

Thanks