kafka基础

基础

是什么

kafka是一个消息中间件,具有分布式、可分区、可备份的日志服务,它使用独特的设计实现了一个消息系统的功能。

设计架构

broker向zookeeper注册,通过zk选举leader,同时当consumer group发生变化时进行rebalance

producer向zk发送请求,拿到broker然后基于topic发送一个主题message,producer到broker是push

消费指定topic的一组consumer pull到消息,均衡消费

组成部分

  • Topic:特指Kafka处理的消息源的不同分类,其实也可以理解为对不同消息源的区分的一个标识;
  • Partition:Topic物理上的分组,一个topic可以设置为多个partition,每个partition都是一个有序的队列,partition中的每条消息都会被分配一个有序的id(offset);
  • Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发送一些消息;
  • Producers:消息和数据生产者,向Kafka的一个topic发送消息的过程叫做producers(producer可以选择向topic哪一个partition发送数据);
  • Consumers:消息和数据消费者,接收topics并处理其发布的消息的过程叫做consumer,同一个topic的数据可以被多个consumer接收;
  • Producers:消息和数据生产者,向Kafka的一个topic发送消息的过程叫做producers(producer可以选择向topic哪一个partition发送数据);
  • Broker:缓存代理,Kafka集群中的一台或多台服务器统称为broker。

作用

系统间解耦、缓存实时数据用于离线处理等

API使用

1、内置的sacla版本客户端

maven依赖

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.11</artifactId>
   <version>0.10.2.1</version>
</dependency>

2、新版本kafka客户端

maven 依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.1.0</version>
</dependency>

 

3、spring集成客户端

maven依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.1.1.RELEASE</version>
</dependency>

生产者spring配置

<!--配置kafka参数-->
<bean id="agreementProperties" class="java.util.HashMap">
    <constructor-arg>
        <map>
            <entry key="bootstrap.servers" value="${kafka.server.port}"/>
            <entry key="retries" value="0"/>
            <entry key="batch.size" value="16384"/>
            <entry key="linger.ms" value="1"/>
            <entry key="request.timeout.ms" value="1000"/>
            <entry key="buffer.memory" value="33554432"/>
            <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
            <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
        </map>
    </constructor-arg>
</bean>
 
<!--创建bean工厂-->
<bean id="agreementFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <constructor-arg ref="agreementProperties"/>
</bean>
<!--生成kafka操作客户端-->
<bean id="agreementKafka" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg ref="agreementFactory"/>
    <constructor-arg name="autoFlush" value="true"/>
</bean>

生产者代码:

@Service("kafkaProducerService")
public class KafkaProducerServiceImpl implements KafkaProducerService {
    private static final Logger logger = LoggerFactory.getLogger(KafkaProducerServiceImpl.class);
    @Resource(name = "agreementKafka")
    private KafkaTemplate<Integer, String> agreementKafka;
   @Override
    public void createContractKfkMsg(String topic, String msg) {
    logger.info("贷中合同创建进件010协议");
    long startTime = System.currentTimeMillis();
    //发送kafka消息
    ListenableFuture<SendResult<Integer, String>> listenableCallBack = agreementKafka.send(topic, msg);
    SuccessCallback<SendResult<Integer, String>> successCallback = (SendResult<Integer, String> result) -> {
        if (result != null && result.getRecordMetadata() != null) {
            logger.info(
                    "send the msg successfully, topic->{}, partition->{}, offset->{}, " +
                            "timestamp->{},serializedKeySize->{},serializedValueSize->{}",
                    topic, result.getRecordMetadata().partition(),
                    result.getRecordMetadata().offset(),
                    result.getRecordMetadata().timestamp(), result.getRecordMetadata().serializedKeySize(),
                    result.getRecordMetadata().serializedValueSize());
        }
    };
    FailureCallback failureCallback = (Throwable ex) -> logger.info("[topic = {}, msg = {}], onFailure:", topic, msg
            , ex);
    listenableCallBack.addCallback(successCallback, failureCallback);
    logger.info("kafka producer send msg ,time is {}", System.currentTimeMillis() - startTime);
   }
}
消费者spring配置

 

<bean id = "agreementActivatedConsumerProperties" class="java.util.HashMap">
    <constructor-arg>
        <map>
            <entry key="zookeeper.connect" value="${zookeeper.connect}"/>
            <entry key="bootstrap.servers" value="${kafka.server.port}"/>
            <entry key="group.id" value="lease.templar.agreement.activated.group" />
            <entry key="enable.auto.commit" value="true" />
            <entry key="auto.commit.interval.ms" value="1000" />
            <entry key="session.timeout.ms" value="15000" />
            <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
            <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
        </map>
    </constructor-arg>
</bean>
<bean id="agreementActivatedConsumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
        <ref bean="agreementActivatedConsumerProperties"/>
    </constructor-arg>
</bean>
<bean id="agreementContainerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg value="${lease.agreement.activated.topic}" />
    <property name="messageListener" ref="agreementActivatedListenService" />
</bean>
<bean id="agreementListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
      init-method="doStart">
    <constructor-arg ref="agreementActivatedConsumerFactory" />
    <constructor-arg ref="agreementContainerProperties" />
</bean>

 

 消费者代码

 

@Service("agreementActivatedListenService")
@Slf4j
public class AgreementActivatedConsumerService implements MessageListener<Integer, String>{
    public static  final String AGREEMENT_REFUSE_PREFIX = "AGREEMENT_SERIAL_NO_";
    @Value("${lease.agereement.activated.state.callback.url}")
    private String callbackUrl;
    @Resource
    private PropertiesResource propertiesResource;
    @Override
    public void onMessage(ConsumerRecord<Integer, String> consumerRecord) {
        JSONObject jsonObj = JSONObject.parseObject(consumerRecord.value());
        if(jsonObj != null){
            log.info("消费到010协议的消息 {}", jsonObj.toJSONString());
            AgreementActivateMesssageBo agreementActivateMesssageBo = jsonObj.toJavaObject(AgreementActivateMesssageBo.class);
            String serialNo;
            if((serialNo = agreementActivateMesssageBo.getSerialNo()) != null){
                RedisTemplate redisTemplate = SpringContextUtil.getBean("redisTemplate", RedisTemplate.class);
                String value = (String)redisTemplate.opsForValue().get(AGREEMENT_REFUSE_PREFIX + serialNo);
                log.info("redis get value = {}" ,value);
                if(serialNo.equals(value)){ //匹配有效单
                    //回调阿里云服务 TODO 处理下
                    try{
                        Map<String, String> params = new HashMap<>();
                        params.put("data", DESedeHelper.encrypt(jsonObj.toJSONString(), propertiesResource.securityClfLeaseCommonSecretKey));
                        String res = HttpUtils.doPost(callbackUrl, params, "utf-8");
                        log.info("回调结果==" + res);
                    }catch (Exception ex){
                        log.error("callback error", ex);
                    }
                }
            }
        }
    }
}

 

继续阅读

发表在 SEO

京汨科技 引领微创新

在众多的个性化Web服务中“京汨快站”,无疑是一个web个性化的代表,自推出以来,很快就吸引了大批拥护。“京汨快站”将“个性化Web建站”的旗号将大众带入“智能化时代”的建站大潮中。

“个性化Web建站”的市场到底有多大?什么样的产品才更容易获得用户青睐?针对这些问题,我们专访了“京汨科技”的研发负责人何小伟先生。

继续阅读

发表在 SEO

何小伟 熊掌号搜索名片/基础/高级长啥样?

问:熊掌号搜索名片设置功能于9月6号上线,符合条件熊掌号可自定义配置搜索名片。那熊掌号搜索名片长啥样?后台如何设置操作与管理呢?

答:首先,熊掌号搜索名片长成这样。如下图:

熊掌号搜索名片是熊掌号在百度移动搜索结果中的展现样式,分为基础名片和高级名片,当用户使用移动端百度搜索『“熊掌号名称”+熊掌号』时,可展现熊掌号运营者设置的搜索名片样式。

【基础名片】

基础名片用于展现熊掌号LOGO、名片、签名等基础信息,用户可通过基础名片直接点击关注熊掌号。基础名片无需设置,开通熊掌号即拥有基础名片展现权益。

【高级名片】

高级名片支持精选导航、文章、商品三个栏目配置,配置高级名片需同时满足以下三个条件 ①完成真实性认证;②熊掌号指数>=100③熊掌号粉丝数>=1000。

设置操作方法如下:

一、基础名片

1.功能介绍

基础名片用于展现熊掌号LOGO、名片、签名等基础信息,用户可通过基础名片直接点击关注熊掌号。

2.申请条件

基础名片无需设置,开通熊掌号即拥有基础名片展现权益。

3.配置效果

基础名片:

二、高级名片

1.功能介绍

高级名片支持精选导航、文章、商品三个栏目展现,符合条件的熊掌号可自定义配置。

2.申请条件

需同时满足以下三个条件:a.完成真实性认证;b.熊掌号指数>=100;c.熊掌号粉丝数>=1000

3.配置效果

高级名片:

4.操作流程

4.1登陆熊掌号平台,点击左侧菜单栏“添加功能”,添加“搜索名片设置“插件

添加完成后“搜索名片设置”会在首页展示,点击进“搜索名片设置“进入设置页面

4.2熊掌号运营者可根据实际需求添加栏目进行搜索名片设置,支持精选导航、文章、商品三个栏目

4.2.1高级名片-“精选导航”栏目设置当前仅支持添加4个或6个链接,点击“选择链接”进入选择链接界面;

链接信息同步自“主页装修图标菜单设置”,熊掌号运营者可在复选框内根据实际需求,选择需要设置的“精选导航“栏目内容,将展现在熊掌号搜索名片-高级名片中;(添加/修改链接信息需要进入主页装修-图标菜单设置页面进行修改或通过图中“主页装修图标菜单设置”跳转到主页装修-图标菜单设置页面进行添加/修改)

“精选导航”展现内容选自于图标菜单内容,如果图标菜单为空,首先需要设置图标菜单。图标菜单设置中,图标名称与功能描述不可为相同字段(两者均展现在精选导航中),如果两者填写内容相同,审核不予通过。当图标菜单超过4个时,功能描述内容将不会出现在号主页,仅用于搜索名片设置。

“精选导航“栏目支持删除及拖动排序操作,设置完成后点击发布即可

4.2.2高级名片-“文章”栏目设置:

高级名片-文章栏目内容同步自熊掌号主页文章栏目。高级名片-文章栏目内容不可编辑,仅做展现。若文章数在6篇以内,则有几篇展现几篇;若文章数超过6篇,则展现6篇。数量小于3个时不做展示。(只有经过搜索结果出图代码改造且图片至少为1图的文章才可以在文章栏目中展现)

4.2.3高级名片-“商品”栏目设置:

商品栏目内测中,暂不支持配置。

4.3高级名片-“精选导航”、“最新文章”、”商品“设置完成后将在搜索名片中的高级名片中展示,左右均可点击切换栏目

继续阅读