【pyflink】pyflink扩展jar包开辟指南(Sink篇)
代码
1646 人阅读
|
0 人回复
|
<
1. 媒介
民圆关于pyflink的介绍太少了。
近来有效pyflink处置数据,终极需求sink到redis中,研讨有果,分享于人,以此为例,举一反三。
简朴介绍一下怎样为pyflink开辟jar包。
2. 阐发
我们的需求是正在python中可以参加sink,正在python中如许挪用。
- data_stream.add_sink(RedisSink("localhost", 6379, "result"))
复造代码 我们察看到add_sink吸取的参数是一个SinkFunction,那是一个java类,以是我们要用java去完成那个类,然后再正在python中引进,给到add_sink办法挪用。
正在java版本的Flink中,有一个曾经用RichSinkFunction完成的RedisSink,不言而喻RichSinkFunction是SinkFunction的子类,构建java没有是本文的重面,感爱好的能够参考以下那个包的源代码。
- package org.apache.flink.streaming.connectors.redis;
复造代码 java实践要做的是本身的完美,并返回python需求的类。
3. java代码
- package simple_redis_sink;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.connectors.redis.RedisSink;
- import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
- import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
- import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
- import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
- public class SimpleRedisSink {
- public RedisSink redisSink;
- // 处置数据,我那里是用的redis的hset,以是需求name, key, value三个参数,此中name我是看成种别(表名)去用的。
- public static final class RedisSinkExample implements RedisMapper<Tuple2<String, String>> {
- public String name;
-
- public RedisSinkExample(String name) {
- this.name = name;
- }
-
- # SinkRedis初初化时挪用,getCommandDescription,设置插进redis的方法。
- public RedisCommandDescription getCommandDescription() {
- return new RedisCommandDescription(RedisCommand.HSET, this.name);
- }
-
- // data是一个(key, value)的Tuple,invoke办法中掏出key
- public String getKeyFromData(Tuple2<String, String> data) {
- return data.f0;
- }
- // data是一个(key, value)的Tuple,invoke办法中掏出value
- public String getValueFromData(Tuple2<String, String> data) {
- return data.f1;
- }
- }
- // 初初化,那里将会是python挪用的进口,host、port、name是python传过去的
- public SimpleRedisSink(String host, int port, String name) {
- FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(host).setPort(port).build();
- this.redisSink = new RedisSink(conf, new SimpleRedisSink.RedisSinkExample(name));
- }
- // 返回RedisSink类
- public RedisSink get() {
- return this.redisSink;
- }
- }
复造代码 4. 编译天生jar
- Project structure->Artifacts->+->JAR->From modules with dependencies...
- 正在弹出的Create JAR from Modules对话框中,挑选extact to the target JAR一项。
- 正在output layout中面击根部,查抄一下Manifest File战Class Path能否设置准确。
Artifacts设置完成后,正在主菜单挑选Build->Build Artifacts...,弹出对话框当选择Build选项便可。
5. pyflink中利用jar包
起首需求正在正在env中减载jar包。
- env = StreamExecutionEnvironment.get_execution_environment()
- env.add_jars("file:///Users/bitekong/PycharmProjects/jyyc_dp_stream/simple_redis_sink.jar")
复造代码 正在python中编写py4j类,买通python取java的办法挪用。
- class RedisSink(SinkFunction):
- def __init__(self, host, port, name):
- gateway = get_gateway()
- j_redis_sink = gateway.jvm.simple_redis_sink.SimpleRedisSink(host, port, name).get()
- super(RedisSink, self).__init__(sink_func=j_redis_sink)
复造代码 接下去就能够正在data_stream中将方才写好的RedisSink类参加sink啦:
- data_stream.add_sink(RedisSink("localhost", 6379, "result"))
复造代码 免责声明:假如进犯了您的权益,请联络站少,我们会实时删除侵权内乱容,感谢协作! |
1、本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,按照目前互联网开放的原则,我们将在不通知作者的情况下,转载文章;如果原文明确注明“禁止转载”,我们一定不会转载。如果我们转载的文章不符合作者的版权声明或者作者不想让我们转载您的文章的话,请您发送邮箱:Cdnjson@163.com提供相关证明,我们将积极配合您!
2、本网站转载文章仅为传播更多信息之目的,凡在本网站出现的信息,均仅供参考。本网站将尽力确保所提供信息的准确性及可靠性,但不保证信息的正确性和完整性,且不对因信息的不正确或遗漏导致的任何损失或损害承担责任。
3、任何透过本网站网页而链接及得到的资讯、产品及服务,本网站概不负责,亦不负任何法律责任。
4、本网站所刊发、转载的文章,其版权均归原作者所有,如其他媒体、网站或个人从本网下载使用,请在转载有关文章时务必尊重该文章的著作权,保留本网注明的“稿件来源”,并自负版权等法律责任。
|
|
|
|
|