4.scala 高级部分

教程 DER ⋅ 于 2023-02-10 19:59:03 ⋅ 最后回复由 刘硕 2023-06-09 22:13:09 ⋅ 1585 阅读

18 高阶函数

在数据和计算中,高阶函数是至少满足下列一个条件的函数:

1)接受一个或多个函数作为输入

2)输出一个函数

输出一个函数

// 输出Int类型
scala> def add(a:Int, b:Int) = a + b
add: (a: Int, b: Int)Int
// 输出函数Int => Int
scala> def add(a:Int, b:Int) = (c:Int) => a + b + c
add: (a: Int, b: Int)Int => Int
// 使用时需要传入多个括号的数据
scala> add(1,2)
res0: Int => Int = <function1>
scala> res0(3)
res1: Int = 6
scala> add(1,2)(3)
res2: Int = 6
// 输出函数 Int => (Int => Int)
scala> def add(a:Int, b:Int) = (c:Int) => (d:Int) => a + b + c + d
add: (a: Int, b: Int)Int => (Int => Int)
scala> add(1,2)(3)(4)
res3: Int = 10

传入的是一个函数

传入函数 (Int,Int) => Int

// 输入参数是 函数 (Int,Int)=>Int
scala> def js(func:(Int,Int)=>Int) = func
js: (func: (Int, Int) => Int)(Int, Int) => Int
scala> val func1 = (a:Int, b:Int) => a + b
func1: (Int, Int) => Int = <function2>
scala> val func2 = (a:Int, b:Int) => a * b
func2: (Int, Int) => Int = <function2>
scala> js(func1)
res6: (Int, Int) => Int = <function2>
scala> res6(1,2)
res7: Int = 3
scala> js(func1)(1,2)
res8: Int = 3
scala> js(func2)(1,2)
res9: Int = 2

传入函数 Int => Int

scala> def js(func:(Int)=>Int) = func
js: (func: Int => Int)Int => Int
scala> val func1 = (a:Int) => a + 10
func1: Int => Int = <function1>
scala> val func2 = (a:Int) => a * 10
func2: Int => Int = <function1>
scala> js(func1)(1)
res10: Int = 11
scala> js(func2)(1)
res11: Int = 10

在上面的基础上,在集合里用函数

scala> def calculate(a:Int,b:Int,f:(Int,Int)=>Int) = f(a,b)
calculate: (a: Int, b: Int, f: (Int, Int) => Int)Int

scala> calculate(10,20,(a,b)=>a+b)
res15: Int = 30

scala> def getFunc(a:Int,b:Int)={
     | val func = (x:Int,y:Int)=> a*x +b*y
     | func
     | }
getFunc: (a: Int, b: Int)(Int, Int) => Int

scala> calculate(10,20,getFunc(1,2))
res16: Int = 50

scala> val arr = Array(1,2,3,4,5,6,7)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7)

scala> arr.reduce((a,b)=> getFunc(1,2)(a,b))
res19: Int = 55

19 部分参数函数

如果函数传递所有预期的参数, 则表示已完全应用它。 如果只传递几个参数并不是全部参数, 那么将返回部分应用的函数。 这样就可以方便地绑定一些参数, 其余的参数可稍后填写补上;

scala> def add(a:Int, b:Int) = a + b
add: (a: Int, b: Int)Int
scala> add(1,2)
res17: Int = 3
// 部分参数就是 固定一部分参数,传入一部分参数
// 方式1:
// 使用时,用_:Int 做参数占位,用于传入参数 
scala> val func1 = add(1, _:Int)
func1: Int => Int = <function1>
scala> func1(2)
res20: Int = 3
// 方式2
// 利用函数,固定一部分参数,传入一部分参数
scala> val func2 = (b:Int) => add(1, b)
func2: Int => Int = <function1>
scala> func2(2)
res21: Int = 3

默认值参数

scala> val add = (x:Int,y:Int) => x+y
add: (Int, Int) => Int = $Lambda$1488/999610765@2da20e8a

scala> val add1 = add(10,_:Int)
add1: Int => Int = $Lambda$1489/652615485@41ff6f38

scala> add1(20)
res20: Int = 30

scala> val add2 = (x:Int) => add(10,x)
add2: Int => Int = $Lambda$1490/554602748@3b47adeb

scala> add2(50)
res21: Int = 60

scala> def add(x:Int,y:Int=20) = x+y
add: (x: Int, y: Int)Int

scala> add(10)
res22: Int = 30

scala> add(10,40)
res23: Int = 50

scala> def circle(r:Int,pi:Double = 3.14) = 2*r* pi
circle: (r: Int, pi: Double)Double

scala> circle(2)
res24: Double = 12.56

scala> circle(2,3.1415926)
res25: Double = 12.5663704

20 柯理化(颗粒化)

柯里化(Currying)是把接受多个参数的函数变换成接受一个单一参数(最初函数的第一个参数)的函数,并且返回接受余下的参数且返回结果的新函数的技术。

是把接受多个参数的函数变成接受一个参数的函数;

柯理化的两种表现形式:

以 加法函数为例:

def curring(x:Int)(y:Int) = x + y

def curring(x:Int) = (y:Int) => x + y

在柯理化形式的基础上,固定一个参数,传入一个参数

scala> def curring(x:Int)(y:Int) = x + y
curring: (x: Int)(y: Int)Int
scala> curring(4)(5)
res23: Int = 9
// 第一种方式
scala> val func1 = curring(5)_
func1: Int => Int = <function1>
scala> func1(4)
res25: Int = 9
// 第二种方式
scala> val func2 = (x:Int) => curring(x)(5)
func2: Int => Int = <function1>
scala> func2(4)
res26: Int = 9
scala> def curring(x:Int) = (y:Int) => x + y
curring: (x: Int)Int => Int

柯里化的演示

scala> arr.fold(0)(_+_)
res26: Int = 28

scala> arr
res27: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7)

scala> def add(x:Int,y:Int) = x+y
add: (x: Int, y: Int)Int

scala> def add(x:Int)(y:Int) = x+y
add: (x: Int)(y: Int)Int

scala> val add1 = (a:Int) => add(a)_
add1: Int => (Int => Int) = $Lambda$1518/1973590042@11a24a1a

scala> add1(10)
res28: Int => Int = $Lambda$1519/2136804728@677e4612

scala> res28(20)
res29: Int = 30

scala> val add2 = (a:Int) => add(a)(10)
add2: Int => Int = $Lambda$1520/1063482862@1411e3e4

scala> add2(100)
res30: Int = 110

scala> 

scala> def add(x:Int)(y:Int)()()()()(z:Int) = x+y+z
add: (x: Int)(y: Int)()()()()(z: Int)Int

scala> add(10)(20)()()()()(30)
res31: Int = 60

柯里化分类

//默认值参数可以分类
scala> def circle(r:Int)(pi:Double=3.14) = {
     | 2*r*pi
     | }
circle: (r: Int)(pi: Double)Double

柯里化函数,配合implicit关键字使用

// 定义带有隐式参数的add方法
// implicit 修饰参数,代表该参数是隐式参数
scala> def add(a:Int)(implicit b:Int = 5) = a + b
add: (a: Int)(implicit b: Int)Int
// 直接传参
scala> add(4)(5)
res22: Int = 9
scala> add(4)(10)
res24: Int = 14
// 执行时,首先找当前环境是否有和隐式参数类型相同的隐式值,如果找到,用隐式值作为隐式参数
// 如果没找到,看隐式参数是否有默认值,如果有,使用默认值
// 如果还没找到,那就抛异常

// 当前环境没有和隐式参数类型相同的隐式值,隐式参数有默认值,使用默认值
// 4 + 5(默认值)
scala> add(4)
res25: Int = 9
// 定义隐式值
scala> implicit val b1:Int = 20
b1: Int = 20
// 当前环境有和隐式参数类型相同的隐式值,使用隐式值
// 4 + 20(隐式值)
scala> add(4)
res26: Int = 24
// 通过 implicitly[Int] 可提取出当前环境的隐式值并赋给变量
scala> val c:Int = implicitly[Int]
c: Int = 20
// 定义String类型隐式值
scala> implicit val b2:String = "aa"
b2: String = aa
scala> add(4)
res27: Int = 24
// 定义 Int类型隐式值, 当前环境有两个Int类型的隐式值
scala> implicit val b3:Int = 30
b3: Int = 30
// 由于当前环境有两个Int类型的隐式值,调用时不知道找哪个,所以报错
scala> add(4)
<console>:16: error: ambiguous implicit values:
 both value b1 of type => Int
 and value b3 of type => Int
 match expected type Int
       add(4)
          ^
// 由于当前环境已乱套,可通过退出重进解决
scala> :quit
C:\Users\My>scala
Welcome to Scala 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131).
Type in expressions for evaluation. Or try :help.
scala>

在开发环境中使用柯里化和implicit,切记不能在同一个类中使用

package util
// 隐式成员是单独放在一个地方,使用的时候引入
object MyPredef {
  // 定义隐式值1
  implicit val b1:Int = 10
  // 定义隐式值2
  implicit val b2:Int = 20
}

package day03
object CurringDemo {
  // 定义带有隐式参数的方法
  def add(a:Int)(implicit b:Int = 5) = a + b
  def main(args: Array[String]): Unit = {
    println(add(4)(1))
    println(add(4))
    // 引入隐式值之后,当前环境就有隐式值了
    import util.MyPredef.b1
    println(add(4))
// 当前环境有两个Int类型隐式值,报异常
// import util.MyPredef.b2
// println(add(4))
  }
}

21 隐式转换

作用:能够丰富现有类库的功能,对类的方法进行增强,常用作类型转换也就是把一个类型转赋予另一个类的功能;

隐式转换应用场景

隐式转换函数、隐式值、隐式参数、隐式对象(只能在静态对象中使用);

21.1 隐式转换函数

隐式转换函数 是指 以implicit关键字声明并带有单个参数的函数,这样的函数被自动应用。

// 声明隐式函数,用于将 int类型转成 String 类型
implicit val int2Stringfunc = (a:Int) => a.toString

使用示例:

// 定义Int类型
scala> val a = 12345
a: Int = 12345
// Int类型没有 length 方法
scala> a.length
<console>:13: error: value length is not a member of Int
       a.length
         ^
// 定义隐式转换函数 (Int)=>String
scala> implicit def int2string(a:Int) = a.toString
warning: there was one feature warning; re-run with -feature for details
int2string: (a: Int)String
// 当执行时,看Int类型是否有length成员,如果有直接调用
// 如果没有,但当前环境是否有隐式函数,能将Int转成有该length成员的类型,如果有则调用
// 执行过程: Int --> String --> 调用String类型的length方法
scala> a.length
res2: Int = 5

scala内部自带了很多隐式转换函数和方法。如1 to 10其实是调用的1.to(10)这个方法

但是在Int类中并没有to这个方法

file

file

file

intWrapper就是以implicit关键字声明并带有单个参数的函数,intWrapper就是一个隐式转换方法;

用于scala 和 java 类型互转

scala> val a:Int = 1
a: Int = 1
// 将 scala 的 Int类型 赋给 java 的 Integer 类型
scala> val b:Integer = a
b: Integer = 1
// 将 java 的 Integer 类型 赋给 scala 的 Int类型
scala> val c:Int = b
c: Int = 1

file

predef这个类就是预定义的predefine的简写

在shell中用:implicit -v来查看,默认有多少个隐式转换函数

在2.12.16中有65个隐式转换,scala升级比较快所以其它版本可能不同

file

隐式转换函数其实用到了装饰模式(增强)

核心思想:用组合实现嵌套逻辑,每层有自己的个性化动作。在不修改原有类的基础上,给原有类增加功能。

file

file

21.2 隐式参数和隐式值

在调用含有隐式参数的函数时,编译器会自动寻找合适的隐式值当做隐式参数,而只有用implict标记过的值、对象、函数才能被找到。

def add(x:Int)(implicit y:Int = 10) = x + y // 参考 柯理化 隐式关键字例子

隐式参数注意:方法的参数如果有多个隐式参数的话,只需要使用一个implicit关键字,隐式参数列表必须放在方法的参数列表后面;

示例:

scala> def demo(a:Int)(implicit b:Int, c:Int) = a + b + c
demo: (a: Int)(implicit b: Int, implicit c: Int)Int
scala> implicit val b1:Int = 10
b1: Int = 10
// 一个隐式值作用在多个隐式参数上
scala> demo(5)
res5: Int = 25

隐式函数作为隐式参数:

    // 定义隐式函数
    implicit val int2stringfunc = (a:Int) => a.toString

    def sayLength(implicit str:String) = println(s"${str.length}")
    sayLength(1234)

21.3 隐式对象

比较器

/**
 * comparable comparator
 * 比较器接口comparable
 * 没有比较器就临时new一个new comparator
 * ordered  ordering
 */
object TestObject {
  def main(args: Array[String]): Unit = {
    val s1 = new Actor("liudehua",95)
    val s2 = new Actor("reba",96)
    val s3 = new Actor("nazha",97)
    val arr = Array(s1,s2,s3)
    arr.sorted(new Ordering[Actor] {
      override def compare(x: Actor, y: Actor): Int = y.fv - x.fv
    }).foreach(t=>println(t.name,t.fv))
  }
}
class Actor(val name:String,val fv:Int) /*extends Ordered[Actor]{
  override def compare(that: Actor): Int = {
    this.fv - that.fv
  }
}*/

隐式对象只能在别的trait/类/对象内部定义。

并且隐式对象在一些方法中需要使用

object MyOrder{
  implicit object MyActorOrdering extends Ordering[Actor]{
    override def compare(x: Actor, y: Actor): Int = y.fv - x.fv
  }
}
object TestObject {
  def main(args: Array[String]): Unit = {
    val s1 = new Actor("liudehua",95)
    val s2 = new Actor("reba",96)
    val s3 = new Actor("nazha",97)
    val arr = Array(s1,s2,s3)
    import MyOrder.MyActorOrdering
    arr.sorted.foreach(t=>println(t.name,t.fv))
    println(implicitly[MyOrder.MyActorOrdering.type ])
  }
}
class Actor(val name:String,val fv:Int)

隐式转换实现排序

object MyOrder{
  implicit object MyActorOrdering extends Ordering[Actor]{
    override def compare(x: Actor, y: Actor): Int = y.fv - x.fv
  }
}
object TestObject {
  def main(args: Array[String]): Unit = {
    val s1 = new Actor("liudehua",95)
    val s2 = new Actor("reba",96)
    val s3 = new Actor("nazha",97)
    val arr = Array(s1,s2,s3)
    import MyOrder.MyActorOrdering
    arr.sorted.foreach(t=>println(t.name,t.fv))
    println(implicitly[MyOrder.MyActorOrdering.type ])
  }
}
class Actor(val name:String,val fv:Int)

21.4 隐式转换的应用示例

1)类型转换

object ImplicitDemo {
  /**
    * 定义了一个隐式转换的方法
    */
  implicit def double2Int(d:Double) = {
    println(s"double:${d} to int:${d.toInt} method")
    d.toInt
  }
  /**
    * 定义了一个隐式转换的函数
    */
  implicit val double2IntF = (d:Double) => {
    println(s"double:${d} to int:${d.toInt} function")
    d.toInt
  }
  def m1(a:Int) = println(a)
  def main(args: Array[String]): Unit = {
    //当隐式转换方法和隐式转换函数同时存在,也就是入参,返回类型相同的情况下,则先找函数,因为scala是函数式编程,那函数就是老大
    //如果函数和方法同时存在,就优先用函数
    val d:Int = 6.6
    m1(6.6)
  }
}

2)给代表文件地址的字符串增加一个可以读文件的功能

这是一个显示的调用并不是一个隐式的调用,这是我们平时开发过程中常用的方法

package day04
import scala.io.Source
class Reader(val path:String) {
  // 读取文件得到文件内容返回
  def read = Source.fromFile(path).mkString
}
object Reader{
  def main(args: Array[String]): Unit = {
    val path:String = "/tmp/scala/ip.txt"
    val reader = new Reader(path)
    println(reader.read)
  }
}

隐式转换函数的实现方法

首先在MyPredef写一个String的隐式转换函数;

file

package day04
import scala.io.Source
class Reader(val path:String) {
  // 读取文件得到文件内容返回
  def read = Source.fromFile(path).mkString
}
object Reader{
  def main(args: Array[String]): Unit = {
    val path:String = "/tmp/scala/ip.txt"
// val reader = new Reader(path)
    // 通过隐式转换函数给字符串赋予读文件的功能(String => Reader)
    import util.MyPredef.string2Reader
    println(path.read)
  }
}

22 泛型

泛型就是不确定的类型,可以在类或方法不确实传入类型时使用,可以提高代码的灵活性和复用性;

scala中泛型的用法和java中差不多,但是会有一些自己独特的语法;

泛型类:指定类可以接受任意类型参数。

泛型方法:指定方法可以接受任意类型参数。

22.1 泛型类基本用法

package day04
import day04.SexEnumObj.SexEnum
// 定义带有泛型的抽象类
abstract class FXDemo[T](val t : T) {
  def printInfo():Unit
}
// 子类继承父类,把泛型具体化成Int
class FXIntDemo[Int](t : Int) extends FXDemo[Int](t:Int){
  override def printInfo(): Unit = {
    println(s"FXIntDemo[Int](${t})")
  }
}
// 子类继承父类,把泛型具体化成String
class FXStringDemo[String](t : String) extends FXDemo[String](t:String){
  override def printInfo(): Unit = {
    println(s"FXIntDemo[String](${t})")
  }
}
// 定义带有多泛型的类
class FXABCDemo[A, B, C](val a:A, val b:B, val c:C){
  override def toString: String = s"${a}, ${b}, ${c}"
}
// 定义sex的枚举对象
object SexEnumObj extends Enumeration{
  // 定义枚举类型(用于泛型)
  type SexEnum = Value
  // 定义枚举值
  val boy, girl = Value
}
object FXDemo{
  def main(args: Array[String]): Unit = {
    val demo = new FXIntDemo[Int](100)
    demo.printInfo()
    val demo2 = new FXStringDemo[String]("xiaoming")
    demo2.printInfo()
    val demo3 = new FXABCDemo[String, Int, String]("xiaoming", 20, "boy")
    println(demo3)
    val demo4 = new FXABCDemo[String, Int, SexEnum]("xiaoming", 20, SexEnumObj.boy)
    println(demo4)
  }
}

22.2 泛型种类

[B \<: A] UpperBound 上界,B类型的父类是A类型,左侧的B的类型必须是A类型或者A类型的子类。

[B >: A] LowerBound 下界,B类型的子类是A类型,左侧的B的类型必须是A类型或者A类型的父类。

[-A] 逆变,通常作为参数类型,T是A的子类。

[+B] 协变,通常作为返回类型,T是B的父类。

22.2.1 UpperBound

UpperBound 用在泛型类或泛型方法上,用于限制传递的参数必须是 指定类型对象或其子类对象。

如果想实现两个对象的比较,需要该对象实现Comparable接口。然后再配上泛型实现通用比较。

泛型继承,java的用法

package javaday04;
public class UpperBoundDemo<T extends Comparable<T>> {
    // psvm
    public static void main(String[] args) {
        // Integer 实现了 Comparable接口,创建对象时,约束通过
        UpperBoundDemo<Integer> demo1 = new UpperBoundDemo<Integer>();
        // Hainiu 没实现 Comparable接口,创建对象时,约束不通过
// UpperBoundDemo<Hainiu> demo2 = new UpperBoundDemo<Hainiu>();
        // 约束通过
        UpperBoundDemo<HainiuComparable> demo3 = new UpperBoundDemo<HainiuComparable>();
    }
}
class Hainiu{}
class HainiuComparable implements Comparable<HainiuComparable>{
    public int compareTo(HainiuComparable o) {
        return 0;
    }
}

泛型继承,scala用法

scala> class A
defined class A

scala> class B extends A
defined class B

scala> class C extends B
defined class C

scala> class Message[T:<B](val msg:T)
<console>:1: error: ']' expected but identifier found.
       class Message[T:<B](val msg:T)
                      ^

scala> class Message[T<:B](val msg:T)
defined class Message

scala> val m = new Message[B](new B)
m: Message[B] = Message@6fb94c36

scala> val m = new Message[A](new A)
<console>:13: error: type arguments [A] do not conform to class Message's type parameter bounds [T <: B]
       val m = new Message[A](new A)
           ^
<console>:13: error: type arguments [A] do not conform to class Message's type parameter bounds [T <: B]
       val m = new Message[A](new A)
                   ^

scala> val m = new Message[C](new C)
m: Message[C] = Message@43ec61f0

22.2.2 LowerBound

LowerBound 用在泛型类或泛型方法上,用于限制传递的参数必须是 指定类型对象或其父类对象。

scala> class Message[T>:B](val msg:T)
defined class Message

scala> new Message[B](new B)
res12: Message[B] = Message@66b98075

scala> new Message[A](new A)
res13: Message[A] = Message@6bde050d

scala> new Message[C](new C)
<console>:13: error: type arguments [C] do not conform to class Message's type parameter bounds [T >: B]
       val res14 =
           ^
<console>:14: error: type arguments [C] do not conform to class Message's type parameter bounds [T >: B]
       new Message[C](new C)
           ^

22.2.3 协变与逆变

在声明Scala的泛型类型时,“+”表示协变(covariance),而“-”表示逆变(contravariance)。

C[+T]:如果A是B的子类,那么C[A]是C[B]的子类;通常作为返回值类型。

C[-T]:如果A是B的子类,那么C[B]是C[A]的子类;通常作为参数类型。

C[T]:无论A和B是什么关系,C[A]和C[B]没有从属关系。

file

scala> class A
defined class A

scala> class B extends A
defined class B

scala> class C extends B
defined class C

scala> def func(f:B=>B)=f
func: (f: B => B)B => B

scala> func((b:B)=>new B)
res0: B => B = <function1>

scala> func((b:A)=>new B)
res1: B => B = <function1>

scala> func((b:C)=>new B)
<console>:16: error: type mismatch;
 found   : C => B
 required: B => B
       func((b:C)=>new B)
                 ^

scala> func((b:B)=>new A)
<console>:15: error: type mismatch;
 found   : A
 required: B
       func((b:B)=>new A)
                   ^

scala> func((b:B)=>new B)
res4: B => B = <function1>

scala> func((b:B)=>new C)
res5: B => B = <function1>

scala> func((b:A)=>new C)
res6: B => B = <function1>

23 Akka

24.1 Akka 概述

Spark的RPC是通过Akka类库实现的,Akka用Scala语言开发,基于Actor并发模型实现;

Akka具有高可靠、高性能、可扩展等特点,使用Akka可以轻松实现分布式RPC功能。

Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(MailBox)。

通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统。

Akka 具有如下特性:

1)提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发;

2)提供了异步非阻塞的、高性能的事件驱动编程模型;

3)超级轻量级事件处理(每GB堆内存几百万Actor);

24.2 Akka 组成及架构原理

ActorSystem

在Akka中,ActorSystem是一个重量级的结构。

ActorSystem 的职责是 负责创建并管理其创建的Actor,ActorSystem的单例的,一个JVM进程中有一个即可,而Actor是多例的。

Actor

在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法

1)preStart()方法:该方法在Actor对象构造方法执行后执行,整 个Actor生命周期中仅执行一次, 就像 mapreduce里的 setup()

2)receive()方法:该方法在Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行, 就像mapreduce里的map()

每个actor 对象有对应的外部引用xxxRef,可以通过该 actor 对象的外部引用与actor通信。

akka的架构原理

file

mailbox负责存储actor收到的消息,dispatcher负责从mailbox取消息,分配线程给actor执行具体的业务逻辑。

sender引用代表最近收到消息的发送actor,通常用于回消息,比如 sender() !xxxx。

24.3 Akka 的使用

使用Akka需要增加这两个的pom依赖

<!-- 添加 akka 的 actor 依赖 -->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.12</artifactId>
    <version>2.4.17</version>
</dependency>
<!-- 多进程之间的 Actor 通信 -->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-remote_2.12</artifactId>
    <version>2.4.17</version>
</dependency>

24.3.1 发送给自己

步骤:

1)创建ActorSystem

file

2)定义处理信息的Actor实现类

class HelleAkka extends Actor{
  //接受消息
  override def receive: Receive = {
    //接受消息的处理逻辑

  }
}

file

3)创建目标Actor的ActorRef对象

file

4)往目标Actor的ActorRef对象发送信息

class MyActor extends Actor{
  override def receive: Receive = {
    case msg => println(s"received ${msg}")
  }
}
object Test {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem.create("iphone13")
    val actor = system.actorOf(Props(new MyActor),"zhangsan")
    actor ! "this is first message"
  }
}

24.3.2 发送给不同的进程

file

远端actorRef设置参数:

Id name age

akka.actor.provider = akka.remote.RemoteActorRefProvider

akka.remote.netty.tcp.hostname = \$host

akka.remote.netty.tcp.port = \$port

  1. 创建一个 siri端用于回复消息

siri

package com.hainiu.akka

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

class Siri extends Actor{
  override def receive: Receive = {
    case "hello" => sender() ! "hello"
    case "eat?" => sender() ! "yes"
    case "eat what?" => sender() ! "rice"
    case "teast good?" => sender() ! "very good"
    case "bye" => sender() ! "bye"
    case "learn bigdata?" => sender() ! "ok fine"
    case "come to hainiu" => sender() ! "ok fine"
    case _ => sender() ! "what???"
  }
}
object Siri {
  def main(args: Array[String]): Unit = {
    val conf =
      """
        |akka.actor.provider = akka.remote.RemoteActorRefProvider
        |akka.remote.netty.tcp.hostname = localhost
        |akka.remote.netty.tcp.port = 6666
        |""".stripMargin
    val config = ConfigFactory.parseString(conf)
    val actorSystem = ActorSystem.create("iphone14",config)
    val actor = actorSystem.actorOf(Props(new Siri),"siri")
  }
}

启动后结果:

file

等待客户端发送消息,实现交互

  1. 创建一个 human端发送消息

human

package com.hainiu.akka

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.io.StdIn

class Human extends Actor{
  override def receive: Receive = {
    case msg =>{
      println(s"siri : ${msg}")
      println("请输入和siri说的话:")
      val line = StdIn.readLine()
      sender()!line
    }
  }

  override def preStart(): Unit = {
    //初始化就执行
    println("请输入和siri说的话:")
    val line = StdIn.readLine()
    val proxy = context.actorSelection("akka.tcp://iphone14@localhost:6666/user/siri")
    proxy ! line
  }
}
object Human{
  def main(args: Array[String]): Unit = {
    val conf =
      """
        |akka.actor.provider = akka.remote.RemoteActorRefProvider
        |akka.remote.netty.tcp.hostname = localhost
        |akka.remote.netty.tcp.port = 8888
        |""".stripMargin
    val config = ConfigFactory.parseString(conf)
    val actorSystem = ActorSystem.create("iphone13", config)
    actorSystem.actorOf(Props(new Human),"hainiu")
  }
}

运行效果:

file

24.3.3 akka模拟集群运行

worker端代码

package com.hainiu.scala

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration

class Worker extends Actor{
  val worker_id = "worker-001"
  val cores = 30
  val memory = "64G"
  override def receive: Receive = {
    case RegistSuccess => {
      println("regist success!!!")
      //start heart beat!!!
      sendHeartBeat
    }
  }
  def sendHeartBeat()={
//    initialDelay: FiniteDuration  3s
//    interval: FiniteDuration  3s
//    receiver: ActorRef  master
//    message: Any 
    import context.dispatcher
    context.system.scheduler.schedule(Duration(3,TimeUnit.SECONDS),Duration(3,TimeUnit.SECONDS),sender(),HeartBeat(worker_id))
  }

  override def preStart(): Unit = {
    val regist = RegistClass(worker_id, System.currentTimeMillis(), cores, memory)
    val proxy = context.actorSelection("akka.tcp://nn1@localhost:8888/user/master")
    proxy ! regist
  }
}
object Worker {
  def main(args: Array[String]): Unit = {
    val conf =
      """
        |akka.actor.provider = akka.remote.RemoteActorRefProvider
        |akka.remote.netty.tcp.hostname = localhost
        |akka.remote.netty.tcp.port = 6666
        |""".stripMargin
    val config = ConfigFactory.parseString(conf)
    val actorSystem = ActorSystem.create("nn1",config)
    val actor = actorSystem.actorOf(Props(new Worker),"worker")
  }
}

master端代码

package com.hainiu.scala

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration

class Master extends Actor{
//  val buffer = ListBuffer[RegistClass]()
  val check_interval = 10
  val timeout = 30000
  var workers = mutable.Map[String,RegistClass]()
  override def receive: Receive = {
    case x:RegistClass =>{
      workers.put(x.workId,x)
      println(s"worker ${x.workId} is  registing... current workers ${workers.size} ")
      sender() ! RegistSuccess
    }
    case HeartBeat(workId) =>{
      workers.get(workId) match {
        case Some(v) =>{
          v.lastUpdateTime = System.currentTimeMillis()
          workers.put(workId,v)
          println(s"worker ${workId} is heart beating ...")
          println(s"current online workers is ${workers.size} !!!!")
        }
        case None =>{
          println(s"worker ${workId} is invalid ....")
        }
      }
    }
    case CheckTimeOut =>{
      if(workers.size>0){
        workers = workers.filter(tp=>{
          val work_id = tp._1
          val registInfo = tp._2
          if(System.currentTimeMillis() - registInfo.lastUpdateTime > timeout){
            println(s"worker ${work_id} is timeout,removed from master!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
            false
          }else{
            true
          }
        })
      }
    }
  }

  override def preStart(): Unit = {
    import context.dispatcher
    context.system.scheduler.schedule(
      Duration(check_interval,TimeUnit.SECONDS),
      Duration(check_interval,TimeUnit.SECONDS),
      self,
      CheckTimeOut
    )
  }
}
object Master {
  def main(args: Array[String]): Unit = {
    val conf =
      """
        |akka.actor.provider = akka.remote.RemoteActorRefProvider
        |akka.remote.netty.tcp.hostname = localhost
        |akka.remote.netty.tcp.port = 8888
        |""".stripMargin
    val config = ConfigFactory.parseString(conf)
    val actorSystem = ActorSystem.create("nn1",config)
    val actor = actorSystem.actorOf(Props(new Master),"master")
  }
}

公共类

package com.hainiu.scala

case class RegistClass(val workId:String,var lastUpdateTime:Long,val cores:Int,val memory:String)
case object RegistSuccess
case class HeartBeat(val workId:String)
case object CheckTimeOut

执行结果

file

file

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-DER,http://hainiubl.com/topics/76192
回复数量: 1
  • 刘硕 马云是我教出来的
    2023-06-09 22:13:09

    野牛老师也太厉害了吧

暂无评论~~
  • 请注意单词拼写,以及中英文排版,参考此页
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
  • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
  • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
Ctrl+Enter